Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
flow_graph.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #define __TBB_flow_graph_H_include_area
22 
23 #include "tbb_stddef.h"
24 #include "atomic.h"
25 #include "spin_mutex.h"
26 #include "null_mutex.h"
27 #include "spin_rw_mutex.h"
28 #include "null_rw_mutex.h"
29 #include "task.h"
31 #include "tbb_exception.h"
34 #include "tbb_profiling.h"
35 #include "task_arena.h"
36 
37 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
38  #if __INTEL_COMPILER
39  // Disabled warning "routine is both inline and noinline"
40  #pragma warning (push)
41  #pragma warning( disable: 2196 )
42  #endif
43  #define __TBB_NOINLINE_SYM __attribute__((noinline))
44 #else
45  #define __TBB_NOINLINE_SYM
46 #endif
47 
48 #if __TBB_PREVIEW_ASYNC_MSG
49 #include <vector> // std::vector in internal::async_storage
50 #include <memory> // std::shared_ptr in async_msg
51 #endif
52 
53 #if __TBB_PREVIEW_STREAMING_NODE
54 // For streaming_node
55 #include <array> // std::array
56 #include <unordered_map> // std::unordered_map
57 #include <type_traits> // std::decay, std::true_type, std::false_type
58 #endif // __TBB_PREVIEW_STREAMING_NODE
59 
60 #if TBB_DEPRECATED_FLOW_ENQUEUE
61 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
62 #else
63 #define FLOW_SPAWN(a) tbb::task::spawn((a))
64 #endif
65 
66 // use the VC10 or gcc version of tuple if it is available.
67 #if __TBB_CPP11_TUPLE_PRESENT
68  #include <tuple>
69 namespace tbb {
70  namespace flow {
71  using std::tuple;
72  using std::tuple_size;
73  using std::tuple_element;
74  using std::get;
75  }
76 }
77 #else
78  #include "compat/tuple"
79 #endif
80 
81 #include<list>
82 #include<queue>
83 
94 namespace tbb {
95 namespace flow {
96 
98 enum concurrency { unlimited = 0, serial = 1 };
99 
100 namespace interface11 {
103 struct null_type {};
104 
106 class continue_msg {};
107 
109 template< typename T > class sender;
110 template< typename T > class receiver;
113 template< typename T, typename U > class limiter_node; // needed for resetting decrementer
115 template< typename R, typename B > class run_and_put_task;
117 namespace internal {
118 
119 template<typename T, typename M> class successor_cache;
120 template<typename T, typename M> class broadcast_cache;
121 template<typename T, typename M> class round_robin_cache;
122 template<typename T, typename M> class predecessor_cache;
123 template<typename T, typename M> class reservable_predecessor_cache;
125 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
126 namespace order {
127 struct following;
128 struct preceding;
129 }
130 template<typename Order, typename... Args> struct node_set;
131 #endif
133 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
134 // Holder of edges both for caches and for those nodes which do not have predecessor caches.
135 // C == receiver< ... > or sender< ... >, depending.
136 template<typename C>
137 class edge_container {
139 public:
140  typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
141 
142  void add_edge(C &s) {
143  built_edges.push_back(&s);
144  }
146  void delete_edge(C &s) {
147  for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
148  if (*i == &s) {
149  (void)built_edges.erase(i);
150  return; // only remove one predecessor per request
151  }
152  }
153  }
155  void copy_edges(edge_list_type &v) {
156  v = built_edges;
157  }
158 
159  size_t edge_count() {
160  return (size_t)(built_edges.size());
161  }
163  void clear() {
164  built_edges.clear();
165  }
166 
167  // methods remove the statement from all predecessors/successors liste in the edge
168  // container.
169  template< typename S > void sender_extract(S &s);
170  template< typename R > void receiver_extract(R &r);
172 private:
173  edge_list_type built_edges;
174 }; // class edge_container
175 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
177 } // namespace internal
179 } // namespace interfaceX
180 } // namespace flow
181 } // namespace tbb
186 namespace tbb {
187 namespace flow {
188 namespace interface11 {
190 // enqueue left task if necessary. Returns the non-enqueued task if there is one.
191 static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
192  // if no RHS task, don't change left.
193  if (right == NULL) return left;
194  // right != NULL
195  if (left == NULL) return right;
196  if (left == SUCCESSFULLY_ENQUEUED) return right;
197  // left contains a task
198  if (right != SUCCESSFULLY_ENQUEUED) {
199  // both are valid tasks
201  return right;
202  }
203  return left;
204 }
206 #if __TBB_PREVIEW_ASYNC_MSG
208 template < typename T > class async_msg;
210 namespace internal {
212 template < typename T > class async_storage;
214 template< typename T, typename = void >
216  typedef async_msg<T> async_type;
217  typedef T filtered_type;
218 
219  static const bool is_async_type = false;
221  static const void* to_void_ptr(const T& t) {
222  return static_cast<const void*>(&t);
223  }
224 
225  static void* to_void_ptr(T& t) {
226  return static_cast<void*>(&t);
227  }
229  static const T& from_void_ptr(const void* p) {
230  return *static_cast<const T*>(p);
231  }
233  static T& from_void_ptr(void* p) {
234  return *static_cast<T*>(p);
235  }
236 
237  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
238  if (is_async) {
239  // This (T) is NOT async and incoming 'A<X> t' IS async
240  // Get data from async_msg
242  task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
243  // finalize() must be called after subscribe() because set() can be called in finalize()
244  // and 'this_recv' client must be subscribed by this moment
245  msg.finalize();
246  return new_task;
247  }
248  else {
249  // Incoming 't' is NOT async
250  return this_recv->try_put_task(from_void_ptr(p));
251  }
252  }
253 };
255 template< typename T >
256 struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
257  typedef T async_type;
258  typedef typename T::async_msg_data_type filtered_type;
260  static const bool is_async_type = true;
262  // Receiver-classes use const interfaces
263  static const void* to_void_ptr(const T& t) {
264  return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
265  }
266 
267  static void* to_void_ptr(T& t) {
268  return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
269  }
270 
271  // Sender-classes use non-const interfaces
272  static const T& from_void_ptr(const void* p) {
273  return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
274  }
275 
276  static T& from_void_ptr(void* p) {
277  return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
278  }
280  // Used in receiver<T> class
281  static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
282  if (is_async) {
283  // Both are async
284  return this_recv->try_put_task(from_void_ptr(p));
285  }
286  else {
287  // This (T) is async and incoming 'X t' is NOT async
288  // Create async_msg for X
289  const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
290  const T msg(t);
291  return this_recv->try_put_task(msg);
292  }
293  }
294 };
297 
299  template< typename, typename > friend class internal::predecessor_cache;
300  template< typename, typename > friend class internal::reservable_predecessor_cache;
301 public:
304 
305  virtual ~untyped_sender() {}
307  // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
308 
309  // TODO: Prevent untyped successor registration
310 
312  virtual bool register_successor( successor_type &r ) = 0;
315  virtual bool remove_successor( successor_type &r ) = 0;
318  virtual bool try_release( ) { return false; }
321  virtual bool try_consume( ) { return false; }
323 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
324  typedef internal::edge_container<successor_type> built_successors_type;
326  typedef built_successors_type::edge_list_type successor_list_type;
327  virtual built_successors_type &built_successors() = 0;
328  virtual void internal_add_built_successor( successor_type & ) = 0;
329  virtual void internal_delete_built_successor( successor_type & ) = 0;
330  virtual void copy_successors( successor_list_type &) = 0;
331  virtual size_t successor_count() = 0;
332 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
333 protected:
335  template< typename X >
336  bool try_get( X &t ) {
338  }
341  template< typename X >
342  bool try_reserve( X &t ) {
344  }
346  virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
347  virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
348 };
351  template< typename, typename > friend class run_and_put_task;
353  template< typename, typename > friend class internal::broadcast_cache;
354  template< typename, typename > friend class internal::round_robin_cache;
355  template< typename, typename > friend class internal::successor_cache;
357 #if __TBB_PREVIEW_OPENCL_NODE
358  template< typename, typename > friend class proxy_dependency_receiver;
359 #endif /* __TBB_PREVIEW_OPENCL_NODE */
360 public:
365  virtual ~untyped_receiver() {}
368  template<typename X>
369  bool try_put(const X& t) {
370  task *res = try_put_task(t);
371  if (!res) return false;
372  if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
373  return true;
374  }
376  // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
377 
378  // TODO: Prevent untyped predecessor registration
381  virtual bool register_predecessor( predecessor_type & ) { return false; }
384  virtual bool remove_predecessor( predecessor_type & ) { return false; }
386 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
387  typedef internal::edge_container<predecessor_type> built_predecessors_type;
388  typedef built_predecessors_type::edge_list_type predecessor_list_type;
389  virtual built_predecessors_type &built_predecessors() = 0;
390  virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
391  virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
392  virtual void copy_predecessors( predecessor_list_type & ) = 0;
393  virtual size_t predecessor_count() = 0;
394 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
395 protected:
396  template<typename X>
397  task *try_put_task(const X& t) {
399  }
401  virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
403  virtual graph& graph_reference() const = 0;
405  // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
408  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
410  virtual bool is_continue_receiver() { return false; }
411 };
413 } // namespace internal
416 template< typename T >
418 public:
420  __TBB_DEPRECATED typedef T output_type;
425  virtual bool try_get( T & ) { return false; }
428  virtual bool try_reserve( T & ) { return false; }
430 protected:
431  virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
432  // Both async OR both are NOT async
435  }
436  // Else: this (T) is async OR incoming 't' is async
437  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
438  return false;
439  }
441  virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
442  // Both async OR both are NOT async
445  }
446  // Else: this (T) is async OR incoming 't' is async
447  __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
448  return false;
449  }
450 }; // class sender<T>
453 template< typename T >
455  template< typename > friend class internal::async_storage;
456  template< typename, typename > friend struct internal::async_helpers;
457 public:
459  __TBB_DEPRECATED typedef T input_type;
465  return internal::untyped_receiver::try_put(t);
466  }
469  return internal::untyped_receiver::try_put(t);
470  }
472 protected:
473  virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
475  }
478  virtual task *try_put_task(const T& t) = 0;
480 }; // class receiver<T>
482 #else // __TBB_PREVIEW_ASYNC_MSG
483 
485 template< typename T >
486 class sender {
487 public:
489  __TBB_DEPRECATED typedef T output_type;
492  __TBB_DEPRECATED typedef receiver<T> successor_type;
494  virtual ~sender() {}
496  // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
497 
499  __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
500 
502  __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
505  virtual bool try_get( T & ) { return false; }
506 
508  virtual bool try_reserve( T & ) { return false; }
511  virtual bool try_release( ) { return false; }
514  virtual bool try_consume( ) { return false; }
516 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
517  __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
519  __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
520  __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
521  __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
522  __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
523  __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
524  __TBB_DEPRECATED virtual size_t successor_count() = 0;
525 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
526 }; // class sender<T>
527 
529 template< typename T >
530 class receiver {
531 public:
533  __TBB_DEPRECATED typedef T input_type;
534 
536  __TBB_DEPRECATED typedef sender<T> predecessor_type;
539  virtual ~receiver() {}
542  bool try_put( const T& t ) {
543  task *res = try_put_task(t);
544  if (!res) return false;
545  if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
546  return true;
547  }
550 protected:
551  template< typename R, typename B > friend class run_and_put_task;
552  template< typename X, typename Y > friend class internal::broadcast_cache;
553  template< typename X, typename Y > friend class internal::round_robin_cache;
554  virtual task *try_put_task(const T& t) = 0;
555  virtual graph& graph_reference() const = 0;
556 public:
557  // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
558 
560  __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
563  __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
564 
565 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
566  __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
567  __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
568  __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
569  __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
570  __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
571  __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
572  __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
573 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
574 
575 protected:
577  virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
579  template<typename TT, typename M> friend class internal::successor_cache;
580  virtual bool is_continue_receiver() { return false; }
582 #if __TBB_PREVIEW_OPENCL_NODE
583  template< typename, typename > friend class proxy_dependency_receiver;
584 #endif /* __TBB_PREVIEW_OPENCL_NODE */
585 }; // class receiver<T>
587 #endif // __TBB_PREVIEW_ASYNC_MSG
588 
591 class continue_receiver : public receiver< continue_msg > {
592 public:
599 
602  __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
603  my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
604  my_current_count = 0;
605  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
606  }
607 
610  my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
611  my_current_count = 0;
612  __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
613  }
614 
618  ++my_predecessor_count;
619  return true;
620  }
621 
627  spin_mutex::scoped_lock l(my_mutex);
628  --my_predecessor_count;
629  return true;
630  }
632 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
633  __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
634  __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
635  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
637  __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
639  my_built_predecessors.add_edge( s );
640  }
642  __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
644  my_built_predecessors.delete_edge(s);
645  }
647  __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
649  my_built_predecessors.copy_edges(v);
650  }
652  __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
654  return my_built_predecessors.edge_count();
655  }
656 
657 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
659 protected:
660  template< typename R, typename B > friend class run_and_put_task;
661  template<typename X, typename Y> friend class internal::broadcast_cache;
662  template<typename X, typename Y> friend class internal::round_robin_cache;
663  // execute body is supposed to be too small to create a task for.
664  task *try_put_task( const input_type & ) __TBB_override {
665  {
667  if ( ++my_current_count < my_predecessor_count )
669  else
670  my_current_count = 0;
671  }
672  task * res = execute();
673  return res? res : SUCCESSFULLY_ENQUEUED;
674  }
675 
676 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
677  // continue_receiver must contain its own built_predecessors because it does
678  // not have a node_cache.
679  built_predecessors_type my_built_predecessors;
680 #endif
686  // the friend declaration in the base class did not eliminate the "protected class"
687  // error in gcc 4.1.2
688  template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
691  my_current_count = 0;
692  if (f & rf_clear_edges) {
693 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
694  my_built_predecessors.clear();
695 #endif
696  my_predecessor_count = my_initial_predecessor_count;
697  }
698  }
703  virtual task * execute() = 0;
704  template<typename TT, typename M> friend class internal::successor_cache;
705  bool is_continue_receiver() __TBB_override { return true; }
707 }; // class continue_receiver
709 } // interfaceX
711 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
712  template <typename K, typename T>
713  K key_from_message( const T &t ) {
714  return t.key();
715  }
716 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
717 
721 } // flow
722 } // tbb
726 
727 namespace tbb {
728 namespace flow {
729 namespace interface11 {
734 #if __TBB_PREVIEW_ASYNC_MSG
736 #endif
738 
739 template <typename C, typename N>
740 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
741 {
742  if (begin) current_node = my_graph->my_nodes;
743  //else it is an end iterator by default
744 }
745 
746 template <typename C, typename N>
748  __TBB_ASSERT(current_node, "graph_iterator at end");
749  return *operator->();
750 }
751 
752 template <typename C, typename N>
754  return current_node;
755 }
757 template <typename C, typename N>
759  if (current_node) current_node = current_node->next;
760 }
762 } // namespace interfaceX
763 
764 namespace interface10 {
766 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
768  own_context = true;
769  cancelled = false;
770  caught_exception = false;
771  my_context = new task_group_context(tbb::internal::FLOW_TASKS);
775  my_is_active = true;
776 }
778 inline graph::graph(task_group_context& use_this_context) :
779  my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
781  own_context = false;
782  cancelled = false;
787  my_is_active = true;
788 }
789 
790 inline graph::~graph() {
791  wait_for_all();
793  tbb::task::destroy(*my_root_task);
794  if (own_context) delete my_context;
795  delete my_task_arena;
796 }
798 inline void graph::reserve_wait() {
799  if (my_root_task) {
802  }
803 }
804 
805 inline void graph::release_wait() {
806  if (my_root_task) {
809  }
810 }
811 
813  n->next = NULL;
814  {
816  n->prev = my_nodes_last;
818  my_nodes_last = n;
819  if (!my_nodes) my_nodes = n;
820  }
821 }
822 
824  {
826  __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
827  if (n->prev) n->prev->next = n->next;
828  if (n->next) n->next->prev = n->prev;
829  if (my_nodes_last == n) my_nodes_last = n->prev;
830  if (my_nodes == n) my_nodes = n->next;
831  }
832  n->prev = n->next = NULL;
833 }
834 
836  // reset context
838 
839  if(my_context) my_context->reset();
840  cancelled = false;
841  caught_exception = false;
842  // reset all the nodes comprising the graph
843  for(iterator ii = begin(); ii != end(); ++ii) {
844  tbb::flow::interface11::graph_node *my_p = &(*ii);
845  my_p->reset_node(f);
846  }
847  // Reattach the arena. Might be useful to run the graph in a particular task_arena
848  // while not limiting graph lifetime to a single task_arena::execute() call.
849  prepare_task_arena( /*reinit=*/true );
851  // now spawn the tasks necessary to start the graph
852  for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
854  }
855  my_reset_task_list.clear();
856 }
858 inline graph::iterator graph::begin() { return iterator(this, true); }
860 inline graph::iterator graph::end() { return iterator(this, false); }
861 
862 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
863 
864 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
865 
866 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
868 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
869 
870 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
871 inline void graph::set_name(const char *name) {
873 }
874 #endif
876 } // namespace interface10
878 namespace interface11 {
880 inline graph_node::graph_node(graph& g) : my_graph(g) {
881  my_graph.register_node(this);
882 }
883 
885  my_graph.remove_node(this);
886 }
889 
890 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
891 using internal::node_set;
892 #endif
893 
895 template < typename Output >
896 class source_node : public graph_node, public sender< Output > {
897 public:
899  typedef Output output_type;
900 
903 
904  //Source node has no input type
906 
907 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
908  typedef typename sender<output_type>::built_successors_type built_successors_type;
909  typedef typename sender<output_type>::successor_list_type successor_list_type;
910 #endif
913  template< typename Body >
914  __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
915  : graph_node(g), my_active(is_active), init_my_active(is_active),
916  my_body( new internal::source_body_leaf< output_type, Body>(body) ),
917  my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
918  my_reserved(false), my_has_cached_item(false)
919  {
920  my_successors.set_owner(this);
921  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
922  static_cast<sender<output_type> *>(this), this->my_body );
923  }
924 
925 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
926  template <typename Body, typename... Successors>
927  source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
928  : source_node(successors.graph_reference(), body, is_active) {
929  make_edges(*this, successors);
930  }
931 #endif
935  graph_node(src.my_graph), sender<Output>(),
936  my_active(src.init_my_active),
937  init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
938  my_reserved(false), my_has_cached_item(false)
939  {
940  my_successors.set_owner(this);
941  tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
942  static_cast<sender<output_type> *>(this), this->my_body );
943  }
944 
946  ~source_node() { delete my_body; delete my_init_body; }
948 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
949  void set_name( const char *name ) __TBB_override {
951  }
952 #endif
955  bool register_successor( successor_type &r ) __TBB_override {
957  my_successors.register_successor(r);
958  if ( my_active )
959  spawn_put();
960  return true;
961  }
962 
964  bool remove_successor( successor_type &r ) __TBB_override {
966  my_successors.remove_successor(r);
967  return true;
968  }
969 
970 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
972  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
974  void internal_add_built_successor( successor_type &r) __TBB_override {
975  spin_mutex::scoped_lock lock(my_mutex);
976  my_successors.internal_add_built_successor(r);
977  }
979  void internal_delete_built_successor( successor_type &r) __TBB_override {
980  spin_mutex::scoped_lock lock(my_mutex);
981  my_successors.internal_delete_built_successor(r);
982  }
983 
984  size_t successor_count() __TBB_override {
985  spin_mutex::scoped_lock lock(my_mutex);
986  return my_successors.successor_count();
987  }
988 
989  void copy_successors(successor_list_type &v) __TBB_override {
990  spin_mutex::scoped_lock l(my_mutex);
991  my_successors.copy_successors(v);
992  }
993 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
994 
996  bool try_get( output_type &v ) __TBB_override {
998  if ( my_reserved )
999  return false;
1000 
1001  if ( my_has_cached_item ) {
1002  v = my_cached_item;
1003  my_has_cached_item = false;
1004  return true;
1005  }
1006  // we've been asked to provide an item, but we have none. enqueue a task to
1007  // provide one.
1008  spawn_put();
1009  return false;
1010  }
1011 
1013  bool try_reserve( output_type &v ) __TBB_override {
1015  if ( my_reserved ) {
1016  return false;
1017  }
1018 
1019  if ( my_has_cached_item ) {
1020  v = my_cached_item;
1021  my_reserved = true;
1022  return true;
1023  } else {
1024  return false;
1025  }
1026  }
1031  spin_mutex::scoped_lock lock(my_mutex);
1032  __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
1033  my_reserved = false;
1034  if(!my_successors.empty())
1035  spawn_put();
1036  return true;
1037  }
1038 
1041  spin_mutex::scoped_lock lock(my_mutex);
1042  __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
1043  my_reserved = false;
1044  my_has_cached_item = false;
1045  if ( !my_successors.empty() ) {
1046  spawn_put();
1047  }
1048  return true;
1049  }
1052  void activate() {
1054  my_active = true;
1055  if (!my_successors.empty())
1056  spawn_put();
1057  }
1058 
1059  template<typename Body>
1061  internal::source_body<output_type> &body_ref = *this->my_body;
1062  return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
1063  }
1065 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1066  void extract( ) __TBB_override {
1067  my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
1068  my_active = init_my_active;
1069  my_reserved = false;
1070  if(my_has_cached_item) my_has_cached_item = false;
1071  }
1072 #endif
1074 protected:
1078  my_active = init_my_active;
1079  my_reserved =false;
1080  if(my_has_cached_item) {
1081  my_has_cached_item = false;
1082  }
1083  if(f & rf_clear_edges) my_successors.clear();
1084  if(f & rf_reset_bodies) {
1085  internal::source_body<output_type> *tmp = my_init_body->clone();
1086  delete my_body;
1087  my_body = tmp;
1088  }
1089  if(my_active)
1091  }
1092 
1093 private:
1102  output_type my_cached_item;
1103 
1104  // used by apply_body_bypass, can invoke body of node.
1105  bool try_reserve_apply_body(output_type &v) {
1106  spin_mutex::scoped_lock lock(my_mutex);
1107  if ( my_reserved ) {
1108  return false;
1109  }
1110  if ( !my_has_cached_item ) {
1111  tbb::internal::fgt_begin_body( my_body );
1112  bool r = (*my_body)(my_cached_item);
1113  tbb::internal::fgt_end_body( my_body );
1114  if (r) {
1115  my_has_cached_item = true;
1116  }
1117  }
1118  if ( my_has_cached_item ) {
1119  v = my_cached_item;
1120  my_reserved = true;
1121  return true;
1122  } else {
1123  return false;
1124  }
1125  }
1126 
1127  // when resetting, and if the source_node was created with my_active == true, then
1128  // when we reset the node we must store a task to run the node, and spawn it only
1129  // after the reset is complete and is_active() is again true. This is why we don't
1130  // test for is_active() here.
1132  return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
1134  }
1135 
1137  void spawn_put( ) {
1138  if(internal::is_graph_active(this->my_graph)) {
1139  internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
1140  }
1141  }
1142 
1143  friend class internal::source_task_bypass< source_node< output_type > >;
1146  output_type v;
1147  if ( !try_reserve_apply_body(v) )
1148  return NULL;
1149 
1150  task *last_task = my_successors.try_put_task(v);
1151  if ( last_task )
1152  try_consume();
1153  else
1154  try_release();
1155  return last_task;
1156  }
1157 }; // class source_node
1158 
1160 template < typename Input, typename Output = continue_msg, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1161 class function_node : public graph_node, public internal::function_input<Input,Output,Policy,Allocator>, public internal::function_output<Output> {
1162 public:
1163  typedef Input input_type;
1164  typedef Output output_type;
1170 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1171  typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1172  typedef typename fOutput_type::successor_list_type successor_list_type;
1173 #endif
1174  using input_impl_type::my_predecessors;
1175 
1177  // input_queue_type is allocated here, but destroyed in the function_input_base.
1178  // TODO: pass the graph_buffer_policy to the function_input_base so it can all
1179  // be done in one place. This would be an interface-breaking change.
1180  template< typename Body >
1184 #else
1186 #endif
1187  : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1188  fOutput_type(g) {
1189  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1190  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1191  }
1192 
1193 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1194  template <typename Body>
1195  function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
1196  : function_node(g, concurrency, body, Policy(), priority) {}
1197 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1198 
1199 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1200  template <typename Body, typename... Args>
1201  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
1203  : function_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1204  make_edges_in_order(nodes, *this);
1205  }
1206 
1207 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1208  template <typename Body, typename... Args>
1209  function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
1210  : function_node(nodes, concurrency, body, Policy(), priority) {}
1211 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1212 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1216  graph_node(src.my_graph),
1217  input_impl_type(src),
1218  fOutput_type(src.my_graph) {
1219  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
1220  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
1221  }
1222 
1223 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1224  void set_name( const char *name ) __TBB_override {
1225  tbb::internal::fgt_node_desc( this, name );
1226  }
1227 #endif
1228 
1229 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1230  void extract( ) __TBB_override {
1231  my_predecessors.built_predecessors().receiver_extract(*this);
1232  successors().built_successors().sender_extract(*this);
1233  }
1234 #endif
1235 
1236 protected:
1237  template< typename R, typename B > friend class run_and_put_task;
1238  template<typename X, typename Y> friend class internal::broadcast_cache;
1239  template<typename X, typename Y> friend class internal::round_robin_cache;
1240  using input_impl_type::try_put_task;
1241 
1242  internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1243 
1245  input_impl_type::reset_function_input(f);
1246  // TODO: use clear() instead.
1247  if(f & rf_clear_edges) {
1248  successors().clear();
1249  my_predecessors.clear();
1250  }
1251  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
1252  __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
1253  }
1255 }; // class function_node
1256 
1258 // Output is a tuple of output types.
1259 template < typename Input, typename Output, typename Policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
1261  public graph_node,
1263  <
1264  Input,
1265  typename internal::wrap_tuple_elements<
1266  tbb::flow::tuple_size<Output>::value, // #elements in tuple
1267  internal::multifunction_output, // wrap this around each element
1268  Output // the tuple providing the types
1269  >::type,
1270  Policy,
1271  Allocator
1272  > {
1273 protected:
1275 public:
1276  typedef Input input_type;
1281 private:
1283  using input_impl_type::my_predecessors;
1284 public:
1285  template<typename Body>
1287  graph &g, size_t concurrency,
1290 #else
1292 #endif
1293  ) : graph_node(g), base_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
1294  tbb::internal::fgt_multioutput_node_with_body<N>(
1295  CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1296  &this->my_graph, static_cast<receiver<input_type> *>(this),
1297  this->output_ports(), this->my_body
1298  );
1299  }
1300 
1301 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1302  template <typename Body>
1303  __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
1304  : multifunction_node(g, concurrency, body, Policy(), priority) {}
1305 #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1306 
1307 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1308  template <typename Body, typename... Args>
1309  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
1311  : multifunction_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1312  make_edges_in_order(nodes, *this);
1313  }
1314 
1315 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1316  template <typename Body, typename... Args>
1317  __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
1318  : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
1319 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1320 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1321 
1323  graph_node(other.my_graph), base_type(other) {
1324  tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
1325  &this->my_graph, static_cast<receiver<input_type> *>(this),
1326  this->output_ports(), this->my_body );
1327  }
1328 
1329 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1330  void set_name( const char *name ) __TBB_override {
1332  }
1333 #endif
1334 
1335 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1336  void extract( ) __TBB_override {
1337  my_predecessors.built_predecessors().receiver_extract(*this);
1338  base_type::extract();
1339  }
1340 #endif
1341  // all the guts are in multifunction_input...
1342 protected:
1343  void reset_node(reset_flags f) __TBB_override { base_type::reset(f); }
1344 }; // multifunction_node
1345 
1347 // successors. The node has unlimited concurrency, so it does not reject inputs.
1348 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
1349 class split_node : public graph_node, public receiver<TupleType> {
1352 public:
1353  typedef TupleType input_type;
1354  typedef Allocator allocator_type;
1355 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1356  typedef typename base_type::predecessor_type predecessor_type;
1357  typedef typename base_type::predecessor_list_type predecessor_list_type;
1358  typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
1359  typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1360 #endif
1361 
1362  typedef typename internal::wrap_tuple_elements<
1363  N, // #elements in tuple
1364  internal::multifunction_output, // wrap this around each element
1365  TupleType // the tuple providing the types
1367 
1369  : graph_node(g),
1370  my_output_ports(internal::init_output_ports<output_ports_type>::call(g, my_output_ports))
1371  {
1372  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1373  static_cast<receiver<input_type> *>(this), this->output_ports());
1374  }
1375 
1376 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1377  template <typename... Args>
1378  __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
1379  make_edges_in_order(nodes, *this);
1380  }
1381 #endif
1382 
1383  __TBB_NOINLINE_SYM split_node(const split_node& other)
1384  : graph_node(other.my_graph), base_type(other),
1385  my_output_ports(internal::init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
1386  {
1387  tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
1388  static_cast<receiver<input_type> *>(this), this->output_ports());
1389  }
1390 
1391 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1392  void set_name( const char *name ) __TBB_override {
1394  }
1395 #endif
1396 
1397  output_ports_type &output_ports() { return my_output_ports; }
1398 
1399 protected:
1400  task *try_put_task(const TupleType& t) __TBB_override {
1401  // Sending split messages in parallel is not justified, as overheads would prevail.
1402  // Also, we do not have successors here. So we just tell the task returned here is successful.
1403  return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
1404  }
1406  if (f & rf_clear_edges)
1407  internal::clear_element<N>::clear_this(my_output_ports);
1408 
1409  __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
1410  }
1413  return my_graph;
1414  }
1415 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1416 private:
1417  void extract() __TBB_override {}
1418 
1420  void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
1421 
1423  void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
1424 
1425  size_t predecessor_count() __TBB_override { return 0; }
1426 
1427  void copy_predecessors(predecessor_list_type&) __TBB_override {}
1429  built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
1430 
1432  built_predecessors_type my_predessors;
1433 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1434 
1435 private:
1436  output_ports_type my_output_ports;
1437 };
1438 
1440 template <typename Output, typename Policy = internal::Policy<void> >
1441 class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
1442  public internal::function_output<Output> {
1443 public:
1445  typedef Output output_type;
1450 
1452  template <typename Body >
1454  graph &g,
1456  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
1457 #else
1459 #endif
1460  ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
1461  fOutput_type(g) {
1462  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1463 
1464  static_cast<receiver<input_type> *>(this),
1465  static_cast<sender<output_type> *>(this), this->my_body );
1466  }
1467 
1468 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1469  template <typename Body>
1470  continue_node( graph& g, Body body, node_priority_t priority )
1471  : continue_node(g, body, Policy(), priority) {}
1472 #endif
1473 
1474 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1475  template <typename Body, typename... Args>
1476  continue_node( const node_set<Args...>& nodes, Body body,
1478  : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
1479  make_edges_in_order(nodes, *this);
1480  }
1481 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1482  template <typename Body, typename... Args>
1483  continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
1484  : continue_node(nodes, body, Policy(), priority) {}
1485 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1486 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1487 
1489  template <typename Body >
1491  graph &g, int number_of_predecessors,
1494 #else
1496 #endif
1497  ) : graph_node(g)
1498  , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
1499  fOutput_type(g) {
1500  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1501  static_cast<receiver<input_type> *>(this),
1502  static_cast<sender<output_type> *>(this), this->my_body );
1503  }
1504 
1505 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
1506  template <typename Body>
1507  continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
1508  : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
1509 #endif
1511 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1512  template <typename Body, typename... Args>
1513  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1515  : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
1516  make_edges_in_order(nodes, *this);
1517  }
1518 
1519 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
1520  template <typename Body, typename... Args>
1521  continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1522  Body body, node_priority_t priority )
1523  : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
1524 #endif
1525 #endif
1526 
1528  __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1529  graph_node(src.my_graph), input_impl_type(src),
1531  tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
1532  static_cast<receiver<input_type> *>(this),
1533  static_cast<sender<output_type> *>(this), this->my_body );
1534  }
1535 
1536 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1537  void set_name( const char *name ) __TBB_override {
1538  tbb::internal::fgt_node_desc( this, name );
1539  }
1540 #endif
1541 
1542 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1543  void extract() __TBB_override {
1544  input_impl_type::my_built_predecessors.receiver_extract(*this);
1545  successors().built_successors().sender_extract(*this);
1546  }
1547 #endif
1549 protected:
1550  template< typename R, typename B > friend class run_and_put_task;
1551  template<typename X, typename Y> friend class internal::broadcast_cache;
1552  template<typename X, typename Y> friend class internal::round_robin_cache;
1553  using input_impl_type::try_put_task;
1554  internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
1555 
1557  input_impl_type::reset_receiver(f);
1558  if(f & rf_clear_edges)successors().clear();
1559  __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1560  }
1561 }; // continue_node
1564 template <typename T>
1565 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1566 public:
1567  typedef T input_type;
1568  typedef T output_type;
1569  typedef typename receiver<input_type>::predecessor_type predecessor_type;
1571 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1572  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1573  typedef typename sender<output_type>::successor_list_type successor_list_type;
1574 #endif
1575 private:
1577 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1578  internal::edge_container<predecessor_type> my_built_predecessors;
1579  spin_mutex pred_mutex; // serialize accesses on edge_container
1580 #endif
1581 public:
1584  my_successors.set_owner( this );
1585  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1586  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1587  }
1588 
1589 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1590  template <typename... Args>
1591  broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1592  make_edges_in_order(nodes, *this);
1593  }
1594 #endif
1595 
1596  // Copy constructor
1597  __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) :
1599  {
1600  my_successors.set_owner( this );
1601  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
1602  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1603  }
1604 
1605 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
1606  void set_name( const char *name ) __TBB_override {
1607  tbb::internal::fgt_node_desc( this, name );
1608  }
1609 #endif
1612  bool register_successor( successor_type &r ) __TBB_override {
1613  my_successors.register_successor( r );
1614  return true;
1615  }
1616 
1618  bool remove_successor( successor_type &r ) __TBB_override {
1619  my_successors.remove_successor( r );
1620  return true;
1621  }
1623 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1624  typedef typename sender<T>::built_successors_type built_successors_type;
1626  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1627 
1628  void internal_add_built_successor(successor_type &r) __TBB_override {
1629  my_successors.internal_add_built_successor(r);
1630  }
1631 
1632  void internal_delete_built_successor(successor_type &r) __TBB_override {
1633  my_successors.internal_delete_built_successor(r);
1634  }
1635 
1636  size_t successor_count() __TBB_override {
1637  return my_successors.successor_count();
1638  }
1639 
1640  void copy_successors(successor_list_type &v) __TBB_override {
1641  my_successors.copy_successors(v);
1642  }
1643 
1644  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1645 
1646  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1648  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
1649  spin_mutex::scoped_lock l(pred_mutex);
1650  my_built_predecessors.add_edge(p);
1651  }
1652 
1653  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
1655  my_built_predecessors.delete_edge(p);
1656  }
1658  size_t predecessor_count() __TBB_override {
1660  return my_built_predecessors.edge_count();
1661  }
1663  void copy_predecessors(predecessor_list_type &v) __TBB_override {
1665  my_built_predecessors.copy_edges(v);
1666  }
1668  void extract() __TBB_override {
1669  my_built_predecessors.receiver_extract(*this);
1670  my_successors.built_successors().sender_extract(*this);
1671  }
1672 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1673 
1674 protected:
1675  template< typename R, typename B > friend class run_and_put_task;
1676  template<typename X, typename Y> friend class internal::broadcast_cache;
1677  template<typename X, typename Y> friend class internal::round_robin_cache;
1680  task *new_task = my_successors.try_put_task(t);
1681  if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1682  return new_task;
1683  }
1684 
1686  return my_graph;
1687  }
1688 
1690 
1692  if (f&rf_clear_edges) {
1693  my_successors.clear();
1694 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1695  my_built_predecessors.clear();
1696 #endif
1697  }
1698  __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1699  }
1700 }; // broadcast_node
1703 template <typename T, typename A=cache_aligned_allocator<T> >
1704 class buffer_node : public graph_node, public internal::reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
1705 public:
1706  typedef T input_type;
1707  typedef T output_type;
1711 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1712  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
1713  typedef typename sender<output_type>::successor_list_type successor_list_type;
1714 #endif
1715 protected:
1716  typedef size_t size_type;
1718 
1719 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1720  internal::edge_container<predecessor_type> my_built_predecessors;
1721 #endif
1722 
1724 
1725  enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1726 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1727  , add_blt_succ, del_blt_succ,
1728  add_blt_pred, del_blt_pred,
1729  blt_succ_cnt, blt_pred_cnt,
1730  blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
1731 #endif
1732  };
1733 
1734  // implements the aggregator_operation concept
1735  class buffer_operation : public internal::aggregated_operation< buffer_operation > {
1736  public:
1737  char type;
1738 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1739  task * ltask;
1740  union {
1741  input_type *elem;
1742  successor_type *r;
1743  predecessor_type *p;
1744  size_t cnt_val;
1745  successor_list_type *svec;
1746  predecessor_list_type *pvec;
1747  };
1748 #else
1749  T *elem;
1750  task * ltask;
1751  successor_type *r;
1752 #endif
1753  buffer_operation(const T& e, op_type t) : type(char(t))
1755 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1756  , ltask(NULL), elem(const_cast<T*>(&e))
1757 #else
1758  , elem(const_cast<T*>(&e)) , ltask(NULL)
1759 #endif
1760  {}
1761  buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
1762  };
1765  typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
1766  friend class internal::aggregating_functor<class_type, buffer_operation>;
1767  internal::aggregator< handler_type, buffer_operation> my_aggregator;
1769  virtual void handle_operations(buffer_operation *op_list) {
1770  handle_operations_impl(op_list, this);
1771  }
1773  template<typename derived_type>
1774  void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1775  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1776 
1777  buffer_operation *tmp = NULL;
1778  bool try_forwarding = false;
1779  while (op_list) {
1780  tmp = op_list;
1781  op_list = op_list->next;
1782  switch (tmp->type) {
1783  case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1784  case rem_succ: internal_rem_succ(tmp); break;
1785  case req_item: internal_pop(tmp); break;
1786  case res_item: internal_reserve(tmp); break;
1787  case rel_res: internal_release(tmp); try_forwarding = true; break;
1788  case con_res: internal_consume(tmp); try_forwarding = true; break;
1789  case put_item: try_forwarding = internal_push(tmp); break;
1790  case try_fwd_task: internal_forward_task(tmp); break;
1791 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1792  // edge recording
1793  case add_blt_succ: internal_add_built_succ(tmp); break;
1794  case del_blt_succ: internal_del_built_succ(tmp); break;
1795  case add_blt_pred: internal_add_built_pred(tmp); break;
1796  case del_blt_pred: internal_del_built_pred(tmp); break;
1797  case blt_succ_cnt: internal_succ_cnt(tmp); break;
1798  case blt_pred_cnt: internal_pred_cnt(tmp); break;
1799  case blt_succ_cpy: internal_copy_succs(tmp); break;
1800  case blt_pred_cpy: internal_copy_preds(tmp); break;
1801 #endif
1802  }
1803  }
1805  derived->order();
1807  if (try_forwarding && !forwarder_busy) {
1809  forwarder_busy = true;
1810  task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
1813  // tmp should point to the last item handled by the aggregator. This is the operation
1814  // the handling thread enqueued. So modifying that record will be okay.
1815  // workaround for icc bug
1816  tbb::task *z = tmp->ltask;
1817  graph &g = this->my_graph;
1818  tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
1819  }
1820  }
1821  } // handle_operations
1824  return op_data.ltask;
1825  }
1827  inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1828  task *ft = grab_forwarding_task(op_data);
1829  if(ft) {
1830  internal::spawn_in_graph_arena(graph_reference(), *ft);
1831  return true;
1832  }
1833  return false;
1834  }
1835 
1837  virtual task *forward_task() {
1838  buffer_operation op_data(try_fwd_task);
1839  task *last_task = NULL;
1840  do {
1841  op_data.status = internal::WAIT;
1842  op_data.ltask = NULL;
1843  my_aggregator.execute(&op_data);
1844 
1845  // workaround for icc bug
1846  tbb::task *xtask = op_data.ltask;
1847  graph& g = this->my_graph;
1848  last_task = combine_tasks(g, last_task, xtask);
1849  } while (op_data.status ==internal::SUCCEEDED);
1850  return last_task;
1851  }
1852 
1855  my_successors.register_successor(*(op->r));
1857  }
1861  my_successors.remove_successor(*(op->r));
1863  }
1865 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1866  typedef typename sender<T>::built_successors_type built_successors_type;
1868  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1870  virtual void internal_add_built_succ(buffer_operation *op) {
1871  my_successors.internal_add_built_successor(*(op->r));
1873  }
1875  virtual void internal_del_built_succ(buffer_operation *op) {
1876  my_successors.internal_delete_built_successor(*(op->r));
1878  }
1880  typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
1882  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
1884  virtual void internal_add_built_pred(buffer_operation *op) {
1885  my_built_predecessors.add_edge(*(op->p));
1887  }
1889  virtual void internal_del_built_pred(buffer_operation *op) {
1890  my_built_predecessors.delete_edge(*(op->p));
1892  }
1893 
1894  virtual void internal_succ_cnt(buffer_operation *op) {
1895  op->cnt_val = my_successors.successor_count();
1897  }
1898 
1899  virtual void internal_pred_cnt(buffer_operation *op) {
1900  op->cnt_val = my_built_predecessors.edge_count();
1902  }
1903 
1904  virtual void internal_copy_succs(buffer_operation *op) {
1905  my_successors.copy_successors(*(op->svec));
1907  }
1909  virtual void internal_copy_preds(buffer_operation *op) {
1910  my_built_predecessors.copy_edges(*(op->pvec));
1912  }
1913 
1914 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1915 
1916 private:
1917  void order() {}
1918 
1919  bool is_item_valid() {
1920  return this->my_item_valid(this->my_tail - 1);
1921  }
1923  void try_put_and_add_task(task*& last_task) {
1924  task *new_task = my_successors.try_put_task(this->back());
1925  if (new_task) {
1926  // workaround for icc bug
1927  graph& g = this->my_graph;
1928  last_task = combine_tasks(g, last_task, new_task);
1929  this->destroy_back();
1930  }
1931  }
1933 protected:
1935  virtual void internal_forward_task(buffer_operation *op) {
1936  internal_forward_task_impl(op, this);
1937  }
1939  template<typename derived_type>
1940  void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1941  __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1943  if (this->my_reserved || !derived->is_item_valid()) {
1945  this->forwarder_busy = false;
1946  return;
1947  }
1948  // Try forwarding, giving each successor a chance
1949  task * last_task = NULL;
1950  size_type counter = my_successors.size();
1951  for (; counter > 0 && derived->is_item_valid(); --counter)
1952  derived->try_put_and_add_task(last_task);
1954  op->ltask = last_task; // return task
1955  if (last_task && !counter) {
1957  }
1958  else {
1960  forwarder_busy = false;
1961  }
1962  }
1963 
1964  virtual bool internal_push(buffer_operation *op) {
1965  this->push_back(*(op->elem));
1967  return true;
1968  }
1969 
1970  virtual void internal_pop(buffer_operation *op) {
1971  if(this->pop_back(*(op->elem))) {
1973  }
1974  else {
1976  }
1977  }
1978 
1980  if(this->reserve_front(*(op->elem))) {
1982  }
1983  else {
1985  }
1986  }
1987 
1989  this->consume_front();
1991  }
1992 
1994  this->release_front();
1996  }
1997 
1998 public:
2000  __TBB_NOINLINE_SYM explicit buffer_node( graph &g ) : graph_node(g), internal::reservable_item_buffer<T>(),
2001  forwarder_busy(false) {
2002  my_successors.set_owner(this);
2003  my_aggregator.initialize_handler(handler_type(this));
2004  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2005  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2006  }
2007 
2008 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2009  template <typename... Args>
2010  buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
2011  make_edges_in_order(nodes, *this);
2012  }
2013 #endif
2014 
2016  __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
2017  internal::reservable_item_buffer<T>(), receiver<T>(), sender<T>() {
2018  forwarder_busy = false;
2019  my_successors.set_owner(this);
2020  my_aggregator.initialize_handler(handler_type(this));
2021  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
2022  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
2023  }
2024 
2025 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2026  void set_name( const char *name ) __TBB_override {
2027  tbb::internal::fgt_node_desc( this, name );
2028  }
2029 #endif
2030 
2031  //
2032  // message sender implementation
2033  //
2034 
2036 
2037  bool register_successor( successor_type &r ) __TBB_override {
2038  buffer_operation op_data(reg_succ);
2039  op_data.r = &r;
2040  my_aggregator.execute(&op_data);
2041  (void)enqueue_forwarding_task(op_data);
2042  return true;
2043  }
2044 
2045 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2046  void internal_add_built_successor( successor_type &r) __TBB_override {
2047  buffer_operation op_data(add_blt_succ);
2048  op_data.r = &r;
2049  my_aggregator.execute(&op_data);
2050  }
2051 
2052  void internal_delete_built_successor( successor_type &r) __TBB_override {
2053  buffer_operation op_data(del_blt_succ);
2054  op_data.r = &r;
2055  my_aggregator.execute(&op_data);
2056  }
2057 
2058  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
2059  buffer_operation op_data(add_blt_pred);
2060  op_data.p = &p;
2061  my_aggregator.execute(&op_data);
2062  }
2063 
2064  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
2065  buffer_operation op_data(del_blt_pred);
2066  op_data.p = &p;
2067  my_aggregator.execute(&op_data);
2068  }
2069 
2070  size_t predecessor_count() __TBB_override {
2071  buffer_operation op_data(blt_pred_cnt);
2072  my_aggregator.execute(&op_data);
2073  return op_data.cnt_val;
2074  }
2075 
2076  size_t successor_count() __TBB_override {
2077  buffer_operation op_data(blt_succ_cnt);
2078  my_aggregator.execute(&op_data);
2079  return op_data.cnt_val;
2080  }
2081 
2082  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
2083  buffer_operation op_data(blt_pred_cpy);
2084  op_data.pvec = &v;
2085  my_aggregator.execute(&op_data);
2086  }
2087 
2088  void copy_successors( successor_list_type &v ) __TBB_override {
2089  buffer_operation op_data(blt_succ_cpy);
2090  op_data.svec = &v;
2091  my_aggregator.execute(&op_data);
2092  }
2093 
2094 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2095 
2097 
2099  bool remove_successor( successor_type &r ) __TBB_override {
2100  r.remove_predecessor(*this);
2101  buffer_operation op_data(rem_succ);
2102  op_data.r = &r;
2103  my_aggregator.execute(&op_data);
2104  // even though this operation does not cause a forward, if we are the handler, and
2105  // a forward is scheduled, we may be the first to reach this point after the aggregator,
2106  // and so should check for the task.
2107  (void)enqueue_forwarding_task(op_data);
2108  return true;
2109  }
2110 
2112 
2114  bool try_get( T &v ) __TBB_override {
2115  buffer_operation op_data(req_item);
2116  op_data.elem = &v;
2117  my_aggregator.execute(&op_data);
2118  (void)enqueue_forwarding_task(op_data);
2119  return (op_data.status==internal::SUCCEEDED);
2120  }
2121 
2123 
2126  buffer_operation op_data(res_item);
2127  op_data.elem = &v;
2128  my_aggregator.execute(&op_data);
2129  (void)enqueue_forwarding_task(op_data);
2130  return (op_data.status==internal::SUCCEEDED);
2131  }
2132 
2134 
2136  buffer_operation op_data(rel_res);
2137  my_aggregator.execute(&op_data);
2138  (void)enqueue_forwarding_task(op_data);
2139  return true;
2140  }
2141 
2143 
2145  buffer_operation op_data(con_res);
2146  my_aggregator.execute(&op_data);
2147  (void)enqueue_forwarding_task(op_data);
2148  return true;
2149  }
2150 
2151 protected:
2152 
2153  template< typename R, typename B > friend class run_and_put_task;
2154  template<typename X, typename Y> friend class internal::broadcast_cache;
2155  template<typename X, typename Y> friend class internal::round_robin_cache;
2158  buffer_operation op_data(t, put_item);
2159  my_aggregator.execute(&op_data);
2160  task *ft = grab_forwarding_task(op_data);
2161  // sequencer_nodes can return failure (if an item has been previously inserted)
2162  // We have to spawn the returned task if our own operation fails.
2163 
2164  if(ft && op_data.status ==internal::FAILED) {
2165  // we haven't succeeded queueing the item, but for some reason the
2166  // call returned a task (if another request resulted in a successful
2167  // forward this could happen.) Queue the task and reset the pointer.
2168  internal::spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
2169  }
2170  else if(!ft && op_data.status ==internal::SUCCEEDED) {
2171  ft = SUCCESSFULLY_ENQUEUED;
2172  }
2173  return ft;
2174  }
2175 
2177  return my_graph;
2178  }
2179 
2181 
2182 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2183 public:
2184  void extract() __TBB_override {
2185  my_built_predecessors.receiver_extract(*this);
2186  my_successors.built_successors().sender_extract(*this);
2187  }
2188 #endif
2189 
2190 protected:
2193  // TODO: just clear structures
2194  if (f&rf_clear_edges) {
2195  my_successors.clear();
2196 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2197  my_built_predecessors.clear();
2198 #endif
2199  }
2200  forwarder_busy = false;
2201  }
2202 }; // buffer_node
2203 
2205 template <typename T, typename A=cache_aligned_allocator<T> >
2206 class queue_node : public buffer_node<T, A> {
2207 protected:
2212 
2213 private:
2214  template<typename, typename> friend class buffer_node;
2215 
2216  bool is_item_valid() {
2217  return this->my_item_valid(this->my_head);
2218  }
2219 
2220  void try_put_and_add_task(task*& last_task) {
2221  task *new_task = this->my_successors.try_put_task(this->front());
2222  if (new_task) {
2223  // workaround for icc bug
2224  graph& graph_ref = this->graph_reference();
2225  last_task = combine_tasks(graph_ref, last_task, new_task);
2226  this->destroy_front();
2227  }
2228  }
2229 
2230 protected:
2231  void internal_forward_task(queue_operation *op) __TBB_override {
2232  this->internal_forward_task_impl(op, this);
2233  }
2234 
2235  void internal_pop(queue_operation *op) __TBB_override {
2236  if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2238  }
2239  else {
2240  this->pop_front(*(op->elem));
2242  }
2243  }
2244  void internal_reserve(queue_operation *op) __TBB_override {
2245  if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2247  }
2248  else {
2249  this->reserve_front(*(op->elem));
2251  }
2252  }
2253  void internal_consume(queue_operation *op) __TBB_override {
2254  this->consume_front();
2256  }
2257 
2258 public:
2259  typedef T input_type;
2260  typedef T output_type;
2263 
2265  __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
2266  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2267  static_cast<receiver<input_type> *>(this),
2268  static_cast<sender<output_type> *>(this) );
2269  }
2270 
2271 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2272  template <typename... Args>
2273  queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
2274  make_edges_in_order(nodes, *this);
2275  }
2276 #endif
2277 
2279  __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
2280  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
2281  static_cast<receiver<input_type> *>(this),
2282  static_cast<sender<output_type> *>(this) );
2283  }
2284 
2285 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2286  void set_name( const char *name ) __TBB_override {
2287  tbb::internal::fgt_node_desc( this, name );
2288  }
2289 #endif
2290 
2291 protected:
2293  base_type::reset_node(f);
2294  }
2295 }; // queue_node
2296 
2298 template< typename T, typename A=cache_aligned_allocator<T> >
2299 class sequencer_node : public queue_node<T, A> {
2301  // my_sequencer should be a benign function and must be callable
2302  // from a parallel context. Does this mean it needn't be reset?
2303 public:
2304  typedef T input_type;
2305  typedef T output_type;
2308 
2310  template< typename Sequencer >
2311  __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
2312  my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2313  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2314  static_cast<receiver<input_type> *>(this),
2315  static_cast<sender<output_type> *>(this) );
2316  }
2317 
2318 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2319  template <typename Sequencer, typename... Args>
2320  sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
2321  : sequencer_node(nodes.graph_reference(), s) {
2322  make_edges_in_order(nodes, *this);
2323  }
2324 #endif
2325 
2327  __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
2328  my_sequencer( src.my_sequencer->clone() ) {
2329  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
2330  static_cast<receiver<input_type> *>(this),
2331  static_cast<sender<output_type> *>(this) );
2332  }
2333 
2335  ~sequencer_node() { delete my_sequencer; }
2336 
2337 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2338  void set_name( const char *name ) __TBB_override {
2339  tbb::internal::fgt_node_desc( this, name );
2340  }
2341 #endif
2342 
2343 protected:
2346 
2347 private:
2348  bool internal_push(sequencer_operation *op) __TBB_override {
2349  size_type tag = (*my_sequencer)(*(op->elem));
2350 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
2351  if (tag < this->my_head) {
2352  // have already emitted a message with this tag
2354  return false;
2355  }
2356 #endif
2357  // cannot modify this->my_tail now; the buffer would be inconsistent.
2358  size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2359 
2360  if (this->size(new_tail) > this->capacity()) {
2361  this->grow_my_array(this->size(new_tail));
2362  }
2363  this->my_tail = new_tail;
2364 
2365  const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
2366  __TBB_store_with_release(op->status, res);
2367  return res ==internal::SUCCEEDED;
2368  }
2369 }; // sequencer_node
2370 
2372 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
2373 class priority_queue_node : public buffer_node<T, A> {
2374 public:
2375  typedef T input_type;
2376  typedef T output_type;
2381 
2383  __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
2384  : buffer_node<T, A>(g), compare(comp), mark(0) {
2385  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2386  static_cast<receiver<input_type> *>(this),
2387  static_cast<sender<output_type> *>(this) );
2388  }
2389 
2390 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2391  template <typename... Args>
2392  priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
2393  : priority_queue_node(nodes.graph_reference(), comp) {
2394  make_edges_in_order(nodes, *this);
2395  }
2396 #endif
2397 
2399  __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {
2400  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
2401  static_cast<receiver<input_type> *>(this),
2402  static_cast<sender<output_type> *>(this) );
2403  }
2404 
2405 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2406  void set_name( const char *name ) __TBB_override {
2407  tbb::internal::fgt_node_desc( this, name );
2408  }
2409 #endif
2410 
2411 protected:
2412 
2414  mark = 0;
2415  base_type::reset_node(f);
2416  }
2417 
2421 
2423  void internal_forward_task(prio_operation *op) __TBB_override {
2424  this->internal_forward_task_impl(op, this);
2425  }
2426 
2427  void handle_operations(prio_operation *op_list) __TBB_override {
2428  this->handle_operations_impl(op_list, this);
2429  }
2430 
2431  bool internal_push(prio_operation *op) __TBB_override {
2432  prio_push(*(op->elem));
2434  return true;
2435  }
2436 
2437  void internal_pop(prio_operation *op) __TBB_override {
2438  // if empty or already reserved, don't pop
2439  if ( this->my_reserved == true || this->my_tail == 0 ) {
2441  return;
2442  }
2443 
2444  *(op->elem) = prio();
2446  prio_pop();
2447 
2448  }
2449 
2450  // pops the highest-priority item, saves copy
2451  void internal_reserve(prio_operation *op) __TBB_override {
2452  if (this->my_reserved == true || this->my_tail == 0) {
2454  return;
2455  }
2456  this->my_reserved = true;
2457  *(op->elem) = prio();
2458  reserved_item = *(op->elem);
2460  prio_pop();
2461  }
2462 
2463  void internal_consume(prio_operation *op) __TBB_override {
2465  this->my_reserved = false;
2466  reserved_item = input_type();
2467  }
2468 
2469  void internal_release(prio_operation *op) __TBB_override {
2471  prio_push(reserved_item);
2472  this->my_reserved = false;
2473  reserved_item = input_type();
2474  }
2475 
2476 private:
2477  template<typename, typename> friend class buffer_node;
2478 
2479  void order() {
2480  if (mark < this->my_tail) heapify();
2481  __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
2482  }
2483 
2484  bool is_item_valid() {
2485  return this->my_tail > 0;
2486  }
2487 
2488  void try_put_and_add_task(task*& last_task) {
2489  task * new_task = this->my_successors.try_put_task(this->prio());
2490  if (new_task) {
2491  // workaround for icc bug
2492  graph& graph_ref = this->graph_reference();
2493  last_task = combine_tasks(graph_ref, last_task, new_task);
2494  prio_pop();
2495  }
2496  }
2497 
2498 private:
2499  Compare compare;
2500  size_type mark;
2501 
2502  input_type reserved_item;
2503 
2504  // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
2505  bool prio_use_tail() {
2506  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
2507  return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2508  }
2509 
2510  // prio_push: checks that the item will fit, expand array if necessary, put at end
2511  void prio_push(const T &src) {
2512  if ( this->my_tail >= this->my_array_size )
2513  this->grow_my_array( this->my_tail + 1 );
2514  (void) this->place_item(this->my_tail, src);
2515  ++(this->my_tail);
2516  __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
2517  }
2518 
2519  // prio_pop: deletes highest priority item from the array, and if it is item
2520  // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
2521  // and mark. Assumes the array has already been tested for emptiness; no failure.
2522  void prio_pop() {
2523  if (prio_use_tail()) {
2524  // there are newly pushed elements; last one higher than top
2525  // copy the data
2526  this->destroy_item(this->my_tail-1);
2527  --(this->my_tail);
2528  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2529  return;
2530  }
2531  this->destroy_item(0);
2532  if(this->my_tail > 1) {
2533  // push the last element down heap
2534  __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2535  this->move_item(0,this->my_tail - 1);
2536  }
2537  --(this->my_tail);
2538  if(mark > this->my_tail) --mark;
2539  if (this->my_tail > 1) // don't reheap for heap of size 1
2540  reheap();
2541  __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
2542  }
2543 
2544  const T& prio() {
2545  return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2546  }
2547 
2548  // turn array into heap
2549  void heapify() {
2550  if(this->my_tail == 0) {
2551  mark = 0;
2552  return;
2553  }
2554  if (!mark) mark = 1;
2555  for (; mark<this->my_tail; ++mark) { // for each unheaped element
2556  size_type cur_pos = mark;
2557  input_type to_place;
2558  this->fetch_item(mark,to_place);
2559  do { // push to_place up the heap
2560  size_type parent = (cur_pos-1)>>1;
2561  if (!compare(this->get_my_item(parent), to_place))
2562  break;
2563  this->move_item(cur_pos, parent);
2564  cur_pos = parent;
2565  } while( cur_pos );
2566  (void) this->place_item(cur_pos, to_place);
2567  }
2568  }
2569 
2570  // otherwise heapified array with new root element; rearrange to heap
2571  void reheap() {
2572  size_type cur_pos=0, child=1;
2573  while (child < mark) {
2574  size_type target = child;
2575  if (child+1<mark &&
2576  compare(this->get_my_item(child),
2577  this->get_my_item(child+1)))
2578  ++target;
2579  // target now has the higher priority child
2580  if (compare(this->get_my_item(target),
2581  this->get_my_item(cur_pos)))
2582  break;
2583  // swap
2584  this->swap_items(cur_pos, target);
2585  cur_pos = target;
2586  child = (cur_pos<<1)+1;
2587  }
2588  }
2589 }; // priority_queue_node
2590 
2591 } // interfaceX
2592 
2593 namespace interface11 {
2594 
2596 
2599 template< typename T, typename DecrementType=continue_msg >
2600 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
2601 public:
2602  typedef T input_type;
2603  typedef T output_type;
2606 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2607  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
2608  typedef typename sender<output_type>::built_successors_type built_successors_type;
2609  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
2610  typedef typename sender<output_type>::successor_list_type successor_list_type;
2611 #endif
2612  //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
2613 
2614 private:
2616  size_t my_count; //number of successful puts
2617  size_t my_tries; //number of active put attempts
2621  __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
2622 
2624 
2625  // Let decrementer call decrement_counter()
2626  friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
2627 
2628  bool check_conditions() { // always called under lock
2629  return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
2630  }
2631 
2632  // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
2634  input_type v;
2635  task *rval = NULL;
2636  bool reserved = false;
2637  {
2638  spin_mutex::scoped_lock lock(my_mutex);
2639  if ( check_conditions() )
2640  ++my_tries;
2641  else
2642  return NULL;
2643  }
2644 
2645  //SUCCESS
2646  // if we can reserve and can put, we consume the reservation
2647  // we increment the count and decrement the tries
2648  if ( (my_predecessors.try_reserve(v)) == true ){
2649  reserved=true;
2650  if ( (rval = my_successors.try_put_task(v)) != NULL ){
2651  {
2652  spin_mutex::scoped_lock lock(my_mutex);
2653  ++my_count;
2654  --my_tries;
2655  my_predecessors.try_consume();
2656  if ( check_conditions() ) {
2657  if ( internal::is_graph_active(this->my_graph) ) {
2658  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2660  internal::spawn_in_graph_arena(graph_reference(), *rtask);
2661  }
2662  }
2663  }
2664  return rval;
2665  }
2666  }
2667  //FAILURE
2668  //if we can't reserve, we decrement the tries
2669  //if we can reserve but can't put, we decrement the tries and release the reservation
2670  {
2671  spin_mutex::scoped_lock lock(my_mutex);
2672  --my_tries;
2673  if (reserved) my_predecessors.try_release();
2674  if ( check_conditions() ) {
2675  if ( internal::is_graph_active(this->my_graph) ) {
2676  task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2678  __TBB_ASSERT(!rval, "Have two tasks to handle");
2679  return rtask;
2680  }
2681  }
2682  return rval;
2683  }
2684  }
2685 
2686  void forward() {
2687  __TBB_ASSERT(false, "Should never be called");
2688  return;
2689  }
2690 
2691  task* decrement_counter( long long delta ) {
2692  {
2693  spin_mutex::scoped_lock lock(my_mutex);
2694  if( delta > 0 && size_t(delta) > my_count )
2695  my_count = 0;
2696  else if( delta < 0 && size_t(delta) > my_threshold - my_count )
2697  my_count = my_threshold;
2698  else
2699  my_count -= size_t(delta); // absolute value of delta is sufficiently small
2700  }
2701  return forward_task();
2702  }
2703 
2704  void initialize() {
2705  my_predecessors.set_owner(this);
2706  my_successors.set_owner(this);
2707  decrement.set_owner(this);
2709  CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
2710  static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2711  static_cast<sender<output_type> *>(this)
2712  );
2713  }
2714 public:
2717 
2718 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
2720  "Deprecated interface of the limiter node can be used only in conjunction "
2721  "with continue_msg as the type of DecrementType template parameter." );
2722 #endif // Check for incompatible interface
2723 
2726  __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
2727  : graph_node(g), my_threshold(threshold), my_count(0),
2729  my_tries(0), decrement(),
2730  init_decrement_predecessors(num_decrement_predecessors),
2731  decrement(num_decrement_predecessors)) {
2732  initialize();
2733  }
2734 
2735 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2736  template <typename... Args>
2737  limiter_node(const node_set<Args...>& nodes, size_t threshold)
2738  : limiter_node(nodes.graph_reference(), threshold) {
2739  make_edges_in_order(nodes, *this);
2740  }
2741 #endif
2742 
2744  limiter_node( const limiter_node& src ) :
2745  graph_node(src.my_graph), receiver<T>(), sender<T>(),
2746  my_threshold(src.my_threshold), my_count(0),
2748  my_tries(0), decrement(),
2749  init_decrement_predecessors(src.init_decrement_predecessors),
2750  decrement(src.init_decrement_predecessors)) {
2751  initialize();
2752  }
2753 
2754 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2755  void set_name( const char *name ) __TBB_override {
2756  tbb::internal::fgt_node_desc( this, name );
2757  }
2758 #endif
2759 
2761  bool register_successor( successor_type &r ) __TBB_override {
2762  spin_mutex::scoped_lock lock(my_mutex);
2763  bool was_empty = my_successors.empty();
2764  my_successors.register_successor(r);
2765  //spawn a forward task if this is the only successor
2766  if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2767  if ( internal::is_graph_active(this->my_graph) ) {
2768  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2770  internal::spawn_in_graph_arena(graph_reference(), *task);
2771  }
2772  }
2773  return true;
2774  }
2775 
2777 
2778  bool remove_successor( successor_type &r ) __TBB_override {
2779  r.remove_predecessor(*this);
2780  my_successors.remove_successor(r);
2781  return true;
2782  }
2783 
2784 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
2785  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
2786  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
2787 
2788  void internal_add_built_successor(successor_type &src) __TBB_override {
2789  my_successors.internal_add_built_successor(src);
2790  }
2791 
2792  void internal_delete_built_successor(successor_type &src) __TBB_override {
2793  my_successors.internal_delete_built_successor(src);
2794  }
2795 
2796  size_t successor_count() __TBB_override { return my_successors.successor_count(); }
2797 
2798  void copy_successors(successor_list_type &v) __TBB_override {
2799  my_successors.copy_successors(v);
2800  }
2801 
2802  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
2803  my_predecessors.internal_add_built_predecessor(src);
2804  }
2805 
2806  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
2807  my_predecessors.internal_delete_built_predecessor(src);
2808  }
2809 
2810  size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
2811 
2812  void copy_predecessors(predecessor_list_type &v) __TBB_override {
2813  my_predecessors.copy_predecessors(v);
2814  }
2815 
2816  void extract() __TBB_override {
2817  my_count = 0;
2818  my_successors.built_successors().sender_extract(*this);
2819  my_predecessors.built_predecessors().receiver_extract(*this);
2820  decrement.built_predecessors().receiver_extract(decrement);
2821  }
2822 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
2823 
2825  bool register_predecessor( predecessor_type &src ) __TBB_override {
2826  spin_mutex::scoped_lock lock(my_mutex);
2827  my_predecessors.add( src );
2828  if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
2829  task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2831  internal::spawn_in_graph_arena(graph_reference(), *task);
2832  }
2833  return true;
2834  }
2835 
2837  bool remove_predecessor( predecessor_type &src ) __TBB_override {
2838  my_predecessors.remove( src );
2839  return true;
2840  }
2841 
2842 protected:
2843 
2844  template< typename R, typename B > friend class run_and_put_task;
2845  template<typename X, typename Y> friend class internal::broadcast_cache;
2846  template<typename X, typename Y> friend class internal::round_robin_cache;
2849  {
2850  spin_mutex::scoped_lock lock(my_mutex);
2851  if ( my_count + my_tries >= my_threshold )
2852  return NULL;
2853  else
2854  ++my_tries;
2855  }
2856 
2857  task * rtask = my_successors.try_put_task(t);
2858 
2859  if ( !rtask ) { // try_put_task failed.
2860  spin_mutex::scoped_lock lock(my_mutex);
2861  --my_tries;
2862  if (check_conditions() && internal::is_graph_active(this->my_graph)) {
2863  rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
2865  }
2866  }
2867  else {
2868  spin_mutex::scoped_lock lock(my_mutex);
2869  ++my_count;
2870  --my_tries;
2871  }
2872  return rtask;
2873  }
2874 
2876 
2878  __TBB_ASSERT(false,NULL); // should never be called
2879  }
2880 
2882  my_count = 0;
2883  if(f & rf_clear_edges) {
2884  my_predecessors.clear();
2885  my_successors.clear();
2886  }
2887  else
2888  {
2889  my_predecessors.reset( );
2890  }
2891  decrement.reset_receiver(f);
2892  }
2893 }; // limiter_node
2894 
2896 
2900 using internal::input_port;
2901 using internal::tag_value;
2902 
2903 template<typename OutputTuple, typename JP=queueing> class join_node;
2904 
2905 template<typename OutputTuple>
2906 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2907 private:
2910 public:
2911  typedef OutputTuple output_type;
2913  __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2914  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2915  this->input_ports(), static_cast< sender< output_type > *>(this) );
2916  }
2917 
2918 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2919  template <typename... Args>
2920  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2921  make_edges_in_order(nodes, *this);
2922  }
2923 #endif
2924 
2925  __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2926  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2927  this->input_ports(), static_cast< sender< output_type > *>(this) );
2928  }
2929 
2930 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2931  void set_name( const char *name ) __TBB_override {
2932  tbb::internal::fgt_node_desc( this, name );
2933  }
2934 #endif
2935 
2936 };
2937 
2938 template<typename OutputTuple>
2939 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2940 private:
2943 public:
2944  typedef OutputTuple output_type;
2946  __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2947  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2948  this->input_ports(), static_cast< sender< output_type > *>(this) );
2949  }
2950 
2951 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2952  template <typename... Args>
2953  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2954  make_edges_in_order(nodes, *this);
2955  }
2956 #endif
2957 
2958  __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2959  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2960  this->input_ports(), static_cast< sender< output_type > *>(this) );
2961  }
2962 
2963 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
2964  void set_name( const char *name ) __TBB_override {
2965  tbb::internal::fgt_node_desc( this, name );
2966  }
2967 #endif
2968 
2969 };
2970 
2971 // template for key_matching join_node
2972 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2973 template<typename OutputTuple, typename K, typename KHash>
2974 class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
2975  key_matching_port, OutputTuple, key_matching<K,KHash> > {
2976 private:
2979 public:
2980  typedef OutputTuple output_type;
2982 
2983 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2984  join_node(graph &g) : unfolded_type(g) {}
2985 
2986 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2987  template <typename... Args>
2988  join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
2989  : join_node(nodes.graph_reference()) {
2990  make_edges_in_order(nodes, *this);
2991  }
2992 #endif
2993 
2994 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2995 
2996  template<typename __TBB_B0, typename __TBB_B1>
2997  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2998  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2999  this->input_ports(), static_cast< sender< output_type > *>(this) );
3000  }
3001  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
3002  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
3003  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3004  this->input_ports(), static_cast< sender< output_type > *>(this) );
3005  }
3006  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
3007  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
3008  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3009  this->input_ports(), static_cast< sender< output_type > *>(this) );
3010  }
3011  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
3012  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
3013  unfolded_type(g, b0, b1, b2, b3, b4) {
3014  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3015  this->input_ports(), static_cast< sender< output_type > *>(this) );
3016  }
3017 #if __TBB_VARIADIC_MAX >= 6
3018  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3019  typename __TBB_B5>
3020  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
3021  unfolded_type(g, b0, b1, b2, b3, b4, b5) {
3022  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3023  this->input_ports(), static_cast< sender< output_type > *>(this) );
3024  }
3025 #endif
3026 #if __TBB_VARIADIC_MAX >= 7
3027  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3028  typename __TBB_B5, typename __TBB_B6>
3029  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
3030  unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
3031  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3032  this->input_ports(), static_cast< sender< output_type > *>(this) );
3033  }
3034 #endif
3035 #if __TBB_VARIADIC_MAX >= 8
3036  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3037  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
3038  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3039  __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
3040  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3041  this->input_ports(), static_cast< sender< output_type > *>(this) );
3042  }
3043 #endif
3044 #if __TBB_VARIADIC_MAX >= 9
3045  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3046  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
3047  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3048  __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
3049  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3050  this->input_ports(), static_cast< sender< output_type > *>(this) );
3051  }
3052 #endif
3053 #if __TBB_VARIADIC_MAX >= 10
3054  template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
3055  typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
3056  __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
3057  __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
3058  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3059  this->input_ports(), static_cast< sender< output_type > *>(this) );
3060  }
3061 #endif
3062 
3063 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3064  template <typename... Args, typename... Bodies>
3065  __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
3066  : join_node(nodes.graph_reference(), bodies...) {
3067  make_edges_in_order(nodes, *this);
3068  }
3069 #endif
3070 
3071  __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
3072  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
3073  this->input_ports(), static_cast< sender< output_type > *>(this) );
3074  }
3075 
3076 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3077  void set_name( const char *name ) __TBB_override {
3078  tbb::internal::fgt_node_desc( this, name );
3079  }
3080 #endif
3081 
3082 };
3083 
3084 // indexer node
3086 
3087 // TODO: Implement interface with variadic template or tuple
3088 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
3089  typename T4=null_type, typename T5=null_type, typename T6=null_type,
3090  typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
3091 
3092 //indexer node specializations
3093 template<typename T0>
3094 class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
3095 private:
3096  static const int N = 1;
3097 public:
3098  typedef tuple<T0> InputTuple;
3101  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3102  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3103  this->input_ports(), static_cast< sender< output_type > *>(this) );
3104  }
3105 
3106 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3107  template <typename... Args>
3108  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3109  make_edges_in_order(nodes, *this);
3110  }
3111 #endif
3112 
3113  // Copy constructor
3114  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3115  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3116  this->input_ports(), static_cast< sender< output_type > *>(this) );
3117  }
3118 
3119 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3120  void set_name( const char *name ) __TBB_override {
3121  tbb::internal::fgt_node_desc( this, name );
3122  }
3123 #endif
3124 };
3125 
3126 template<typename T0, typename T1>
3127 class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
3128 private:
3129  static const int N = 2;
3130 public:
3131  typedef tuple<T0, T1> InputTuple;
3134  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3135  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3136  this->input_ports(), static_cast< sender< output_type > *>(this) );
3137  }
3138 
3139 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3140  template <typename... Args>
3141  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3142  make_edges_in_order(nodes, *this);
3143  }
3144 #endif
3145 
3146  // Copy constructor
3147  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3148  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3149  this->input_ports(), static_cast< sender< output_type > *>(this) );
3150  }
3151 
3152 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3153  void set_name( const char *name ) __TBB_override {
3154  tbb::internal::fgt_node_desc( this, name );
3155  }
3156 #endif
3157 };
3158 
3159 template<typename T0, typename T1, typename T2>
3160 class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
3161 private:
3162  static const int N = 3;
3163 public:
3164  typedef tuple<T0, T1, T2> InputTuple;
3167  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3168  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3169  this->input_ports(), static_cast< sender< output_type > *>(this) );
3170  }
3171 
3172 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3173  template <typename... Args>
3174  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3175  make_edges_in_order(nodes, *this);
3176  }
3177 #endif
3178 
3179  // Copy constructor
3180  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3181  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3182  this->input_ports(), static_cast< sender< output_type > *>(this) );
3183  }
3184 
3185 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3186  void set_name( const char *name ) __TBB_override {
3187  tbb::internal::fgt_node_desc( this, name );
3188  }
3189 #endif
3190 };
3191 
3192 template<typename T0, typename T1, typename T2, typename T3>
3193 class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
3194 private:
3195  static const int N = 4;
3196 public:
3197  typedef tuple<T0, T1, T2, T3> InputTuple;
3200  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3201  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3202  this->input_ports(), static_cast< sender< output_type > *>(this) );
3203  }
3204 
3205 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3206  template <typename... Args>
3207  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3208  make_edges_in_order(nodes, *this);
3209  }
3210 #endif
3211 
3212  // Copy constructor
3213  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3214  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3215  this->input_ports(), static_cast< sender< output_type > *>(this) );
3216  }
3217 
3218 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3219  void set_name( const char *name ) __TBB_override {
3220  tbb::internal::fgt_node_desc( this, name );
3221  }
3222 #endif
3223 };
3224 
3225 template<typename T0, typename T1, typename T2, typename T3, typename T4>
3226 class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
3227 private:
3228  static const int N = 5;
3229 public:
3230  typedef tuple<T0, T1, T2, T3, T4> InputTuple;
3233  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3234  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3235  this->input_ports(), static_cast< sender< output_type > *>(this) );
3236  }
3237 
3238 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3239  template <typename... Args>
3240  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3241  make_edges_in_order(nodes, *this);
3242  }
3243 #endif
3244 
3245  // Copy constructor
3246  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3247  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3248  this->input_ports(), static_cast< sender< output_type > *>(this) );
3249  }
3250 
3251 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3252  void set_name( const char *name ) __TBB_override {
3253  tbb::internal::fgt_node_desc( this, name );
3254  }
3255 #endif
3256 };
3257 
3258 #if __TBB_VARIADIC_MAX >= 6
3259 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
3260 class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
3261 private:
3262  static const int N = 6;
3263 public:
3264  typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
3267  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3268  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3269  this->input_ports(), static_cast< sender< output_type > *>(this) );
3270  }
3271 
3272 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3273  template <typename... Args>
3274  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3275  make_edges_in_order(nodes, *this);
3276  }
3277 #endif
3278 
3279  // Copy constructor
3280  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3281  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3282  this->input_ports(), static_cast< sender< output_type > *>(this) );
3283  }
3284 
3285 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3286  void set_name( const char *name ) __TBB_override {
3287  tbb::internal::fgt_node_desc( this, name );
3288  }
3289 #endif
3290 };
3291 #endif //variadic max 6
3292 
3293 #if __TBB_VARIADIC_MAX >= 7
3294 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3295  typename T6>
3296 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3297 private:
3298  static const int N = 7;
3299 public:
3300  typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
3303  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3304  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3305  this->input_ports(), static_cast< sender< output_type > *>(this) );
3306  }
3307 
3308 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3309  template <typename... Args>
3310  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3311  make_edges_in_order(nodes, *this);
3312  }
3313 #endif
3314 
3315  // Copy constructor
3316  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3317  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3318  this->input_ports(), static_cast< sender< output_type > *>(this) );
3319  }
3320 
3321 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3322  void set_name( const char *name ) __TBB_override {
3323  tbb::internal::fgt_node_desc( this, name );
3324  }
3325 #endif
3326 };
3327 #endif //variadic max 7
3328 
3329 #if __TBB_VARIADIC_MAX >= 8
3330 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3331  typename T6, typename T7>
3332 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3333 private:
3334  static const int N = 8;
3335 public:
3336  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
3339  indexer_node(graph& g) : unfolded_type(g) {
3340  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3341  this->input_ports(), static_cast< sender< output_type > *>(this) );
3342  }
3343 
3344 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3345  template <typename... Args>
3346  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3347  make_edges_in_order(nodes, *this);
3348  }
3349 #endif
3350 
3351  // Copy constructor
3352  indexer_node( const indexer_node& other ) : unfolded_type(other) {
3353  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3354  this->input_ports(), static_cast< sender< output_type > *>(this) );
3355  }
3356 
3357 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3358  void set_name( const char *name ) __TBB_override {
3359  tbb::internal::fgt_node_desc( this, name );
3360  }
3361 #endif
3362 };
3363 #endif //variadic max 8
3364 
3365 #if __TBB_VARIADIC_MAX >= 9
3366 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3367  typename T6, typename T7, typename T8>
3368 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3369 private:
3370  static const int N = 9;
3371 public:
3372  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
3375  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3376  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3377  this->input_ports(), static_cast< sender< output_type > *>(this) );
3378  }
3379 
3380 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3381  template <typename... Args>
3382  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3383  make_edges_in_order(nodes, *this);
3384  }
3385 #endif
3386 
3387  // Copy constructor
3388  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3389  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3390  this->input_ports(), static_cast< sender< output_type > *>(this) );
3391  }
3392 
3393 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3394  void set_name( const char *name ) __TBB_override {
3395  tbb::internal::fgt_node_desc( this, name );
3396  }
3397 #endif
3398 };
3399 #endif //variadic max 9
3400 
3401 #if __TBB_VARIADIC_MAX >= 10
3402 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
3403  typename T6, typename T7, typename T8, typename T9>
3404 class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
3405 private:
3406  static const int N = 10;
3407 public:
3408  typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
3411  __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
3412  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3413  this->input_ports(), static_cast< sender< output_type > *>(this) );
3414  }
3415 
3416 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3417  template <typename... Args>
3418  indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
3419  make_edges_in_order(nodes, *this);
3420  }
3421 #endif
3422 
3423  // Copy constructor
3424  __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
3425  tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
3426  this->input_ports(), static_cast< sender< output_type > *>(this) );
3427  }
3428 
3429 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3430  void set_name( const char *name ) __TBB_override {
3431  tbb::internal::fgt_node_desc( this, name );
3432  }
3433 #endif
3434 };
3435 #endif //variadic max 10
3436 
3437 #if __TBB_PREVIEW_ASYNC_MSG
3439 #else
3440 template< typename T >
3441 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
3442 #endif
3443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3444  s.internal_add_built_predecessor(p);
3445  p.internal_add_built_successor(s);
3446 #endif
3447  p.register_successor( s );
3448  tbb::internal::fgt_make_edge( &p, &s );
3449 }
3450 
3452 template< typename T >
3453 inline void make_edge( sender<T> &p, receiver<T> &s ) {
3454  internal_make_edge( p, s );
3455 }
3456 
3457 #if __TBB_PREVIEW_ASYNC_MSG
3458 template< typename TS, typename TR,
3461 inline void make_edge( TS &p, TR &s ) {
3462  internal_make_edge( p, s );
3463 }
3464 
3465 template< typename T >
3467  internal_make_edge( p, s );
3468 }
3469 
3470 template< typename T >
3472  internal_make_edge( p, s );
3473 }
3474 
3475 #endif // __TBB_PREVIEW_ASYNC_MSG
3476 
3477 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3478 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
3479 template< typename T, typename V,
3480  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3481 inline void make_edge( T& output, V& input) {
3482  make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3483 }
3484 
3485 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
3486 template< typename T, typename R,
3487  typename = typename T::output_ports_type >
3488 inline void make_edge( T& output, receiver<R>& input) {
3489  make_edge(get<0>(output.output_ports()), input);
3490 }
3491 
3492 //Makes an edge from a sender to port 0 of a multi-input successor.
3493 template< typename S, typename V,
3494  typename = typename V::input_ports_type >
3495 inline void make_edge( sender<S>& output, V& input) {
3496  make_edge(output, get<0>(input.input_ports()));
3497 }
3498 #endif
3499 
3500 #if __TBB_PREVIEW_ASYNC_MSG
3502 #else
3503 template< typename T >
3504 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
3505 #endif
3506  p.remove_successor( s );
3507 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3508  // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
3509  p.internal_delete_built_successor(s);
3510  s.internal_delete_built_predecessor(p);
3511 #endif
3513 }
3514 
3516 template< typename T >
3517 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
3518  internal_remove_edge( p, s );
3519 }
3520 
3521 #if __TBB_PREVIEW_ASYNC_MSG
3522 template< typename TS, typename TR,
3525 inline void remove_edge( TS &p, TR &s ) {
3526  internal_remove_edge( p, s );
3527 }
3528 
3529 template< typename T >
3531  internal_remove_edge( p, s );
3532 }
3533 
3534 template< typename T >
3536  internal_remove_edge( p, s );
3537 }
3538 #endif // __TBB_PREVIEW_ASYNC_MSG
3539 
3540 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3541 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
3542 template< typename T, typename V,
3543  typename = typename T::output_ports_type, typename = typename V::input_ports_type >
3544 inline void remove_edge( T& output, V& input) {
3545  remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3546 }
3547 
3548 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
3549 template< typename T, typename R,
3550  typename = typename T::output_ports_type >
3551 inline void remove_edge( T& output, receiver<R>& input) {
3552  remove_edge(get<0>(output.output_ports()), input);
3553 }
3554 //Removes an edge between a sender and port 0 of a multi-input successor.
3555 template< typename S, typename V,
3556  typename = typename V::input_ports_type >
3557 inline void remove_edge( sender<S>& output, V& input) {
3558  remove_edge(output, get<0>(input.input_ports()));
3559 }
3560 #endif
3561 
3562 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3563 template<typename C >
3564 template< typename S >
3565 void internal::edge_container<C>::sender_extract( S &s ) {
3566  edge_list_type e = built_edges;
3567  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3568  remove_edge(s, **i);
3569  }
3570 }
3571 
3572 template<typename C >
3573 template< typename R >
3574 void internal::edge_container<C>::receiver_extract( R &r ) {
3575  edge_list_type e = built_edges;
3576  for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3577  remove_edge(**i, r);
3578  }
3579 }
3580 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
3581 
3583 template< typename Body, typename Node >
3584 Body copy_body( Node &n ) {
3585  return n.template copy_function_object<Body>();
3586 }
3587 
3588 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
3589 
3590 //composite_node
3591 template< typename InputTuple, typename OutputTuple > class composite_node;
3592 
3593 template< typename... InputTypes, typename... OutputTypes>
3594 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
3595 
3596 public:
3597  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3598  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3599 
3600 private:
3601  std::unique_ptr<input_ports_type> my_input_ports;
3602  std::unique_ptr<output_ports_type> my_output_ports;
3603 
3604  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3605  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3606 
3607 protected:
3609 
3610 public:
3611 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3612  composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
3613  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3615  }
3616 #else
3618  tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
3619  }
3620 #endif
3621 
3622  template<typename T1, typename T2>
3623  void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
3624  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3625  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3626  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3627  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3628 
3631  }
3632 
3633  template< typename... NodeTypes >
3634  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3635 
3636  template< typename... NodeTypes >
3637  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3638 
3639 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3640  void set_name( const char *name ) __TBB_override {
3642  }
3643 #endif
3644 
3646  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3647  return *my_input_ports;
3648  }
3649 
3650  output_ports_type& output_ports() {
3651  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3652  return *my_output_ports;
3653  }
3654 
3655 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3656  void extract() __TBB_override {
3657  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3658  }
3659 #endif
3660 }; // class composite_node
3661 
3662 //composite_node with only input ports
3663 template< typename... InputTypes>
3664 class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
3665 public:
3666  typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
3667 
3668 private:
3669  std::unique_ptr<input_ports_type> my_input_ports;
3670  static const size_t NUM_INPUTS = sizeof...(InputTypes);
3671 
3672 protected:
3674 
3675 public:
3676 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3677  composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3678  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3680  }
3681 #else
3683  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3684  }
3685 #endif
3686 
3687  template<typename T>
3688  void set_external_ports(T&& input_ports_tuple) {
3689  __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
3690 
3691  my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3692 
3693  tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
3694  }
3695 
3696  template< typename... NodeTypes >
3697  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3698 
3699  template< typename... NodeTypes >
3700  void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3701 
3702 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3703  void set_name( const char *name ) __TBB_override {
3705  }
3706 #endif
3707 
3709  __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
3710  return *my_input_ports;
3711  }
3712 
3713 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3714  void extract() __TBB_override {
3715  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3716  }
3717 #endif
3718 
3719 }; // class composite_node
3720 
3721 //composite_nodes with only output_ports
3722 template<typename... OutputTypes>
3723 class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
3724 public:
3725  typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
3726 
3727 private:
3728  std::unique_ptr<output_ports_type> my_output_ports;
3729  static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
3730 
3731 protected:
3733 
3734 public:
3735 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3736  __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
3737  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3739  }
3740 #else
3742  tbb::internal::fgt_composite( CODEPTR(), this, &g );
3743  }
3744 #endif
3745 
3746  template<typename T>
3747  void set_external_ports(T&& output_ports_tuple) {
3748  __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
3749 
3750  my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3751 
3752  tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
3753  }
3754 
3755  template<typename... NodeTypes >
3756  void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
3757 
3758  template<typename... NodeTypes >
3759  void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
3760 
3761 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3762  void set_name( const char *name ) __TBB_override {
3764  }
3765 #endif
3766 
3767  output_ports_type& output_ports() {
3768  __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
3769  return *my_output_ports;
3770  }
3771 
3772 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3773  void extract() __TBB_override {
3774  __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
3775  }
3776 #endif
3777 
3778 }; // class composite_node
3779 
3780 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
3781 
3782 namespace internal {
3783 
3784 template<typename Gateway>
3786 public:
3787  typedef Gateway gateway_type;
3788 
3789  async_body_base(gateway_type *gateway): my_gateway(gateway) { }
3790  void set_gateway(gateway_type *gateway) {
3791  my_gateway = gateway;
3792  }
3793 
3794 protected:
3795  gateway_type *my_gateway;
3796 };
3797 
3798 template<typename Input, typename Ports, typename Gateway, typename Body>
3799 class async_body: public async_body_base<Gateway> {
3800 public:
3802  typedef Gateway gateway_type;
3803 
3804  async_body(const Body &body, gateway_type *gateway)
3805  : base_type(gateway), my_body(body) { }
3806 
3807  void operator()( const Input &v, Ports & ) {
3808  my_body(v, *this->my_gateway);
3809  }
3810 
3811  Body get_body() { return my_body; }
3812 
3813 private:
3814  Body my_body;
3815 };
3816 
3817 } // namespace internal
3818 
3819 } // namespace interfaceX
3820 namespace interface11 {
3821 
3823 template < typename Input, typename Output,
3824  typename Policy = queueing_lightweight,
3825  typename Allocator=cache_aligned_allocator<Input> >
3826 class async_node : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output > {
3829 
3830 public:
3831  typedef Input input_type;
3832  typedef Output output_type;
3839 
3840 private:
3843  output_port_type *port;
3844  // TODO: pass value by copy since we do not want to block asynchronous thread.
3845  const Output *value;
3846  bool result;
3847  try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
3848  void operator()() {
3849  result = port->try_put(*value);
3850  }
3851  };
3852 
3853  class receiver_gateway_impl: public receiver_gateway<Output> {
3854  public:
3855  receiver_gateway_impl(async_node* node): my_node(node) {}
3857  tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3858  my_node->my_graph.reserve_wait();
3859  }
3860 
3862  my_node->my_graph.release_wait();
3863  tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
3864  }
3865 
3867  bool try_put(const Output &i) __TBB_override {
3868  return my_node->try_put_impl(i);
3869  }
3870 
3871  private:
3873  } my_gateway;
3874 
3875  //The substitute of 'this' for member construction, to prevent compiler warnings
3876  async_node* self() { return this; }
3877 
3879  bool try_put_impl(const Output &i) {
3880  internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
3881  internal::broadcast_cache<output_type>& port_successors = port_0.successors();
3883  task_list tasks;
3884  bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
3885  __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
3886  "Return status is inconsistent with the method operation." );
3887 
3888  while( !tasks.empty() ) {
3890  }
3891  tbb::internal::fgt_async_try_put_end(this, &port_0);
3892  return is_at_least_one_put_successful;
3893  }
3894 
3895 public:
3896  template<typename Body>
3898  graph &g, size_t concurrency,
3900  Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
3901 #else
3903 #endif
3904  ) : base_type(
3905  g, concurrency,
3906  internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3907  (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
3908  tbb::internal::fgt_multioutput_node_with_body<1>(
3909  CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
3910  &this->my_graph, static_cast<receiver<input_type> *>(this),
3911  this->output_ports(), this->my_body
3912  );
3913  }
3914 
3915 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
3916  template <typename Body, typename... Args>
3917  __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
3918  : async_node(g, concurrency, body, Policy(), priority) {}
3919 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3920 
3921 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3922  template <typename Body, typename... Args>
3923  __TBB_NOINLINE_SYM async_node(
3924  const node_set<Args...>& nodes, size_t concurrency, Body body,
3926  ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
3927  make_edges_in_order(nodes, *this);
3928  }
3929 
3930 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3931  template <typename Body, typename... Args>
3932  __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
3933  : async_node(nodes, concurrency, body, Policy(), priority) {}
3934 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
3935 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3936 
3937  __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
3938  static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3939  static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3940 
3941  tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
3942  &this->my_graph, static_cast<receiver<input_type> *>(this),
3943  this->output_ports(), this->my_body );
3944  }
3945 
3946  gateway_type& gateway() {
3947  return my_gateway;
3948  }
3949 
3950 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
3951  void set_name( const char *name ) __TBB_override {
3953  }
3954 #endif
3955 
3956  // Define sender< Output >
3957 
3959  bool register_successor( successor_type &r ) __TBB_override {
3960  return internal::output_port<0>(*this).register_successor(r);
3961  }
3962 
3964  bool remove_successor( successor_type &r ) __TBB_override {
3965  return internal::output_port<0>(*this).remove_successor(r);
3966  }
3967 
3968  template<typename Body>
3972  mfn_body_type &body_ref = *this->my_body;
3973  async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3974  return ab.get_body();
3975  }
3976 
3977 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
3978  typedef typename internal::edge_container<successor_type> built_successors_type;
3980  typedef typename built_successors_type::edge_list_type successor_list_type;
3981  built_successors_type &built_successors() __TBB_override {
3982  return internal::output_port<0>(*this).built_successors();
3983  }
3984 
3985  void internal_add_built_successor( successor_type &r ) __TBB_override {
3986  internal::output_port<0>(*this).internal_add_built_successor(r);
3987  }
3988 
3989  void internal_delete_built_successor( successor_type &r ) __TBB_override {
3990  internal::output_port<0>(*this).internal_delete_built_successor(r);
3991  }
3992 
3993  void copy_successors( successor_list_type &l ) __TBB_override {
3994  internal::output_port<0>(*this).copy_successors(l);
3995  }
3996 
3997  size_t successor_count() __TBB_override {
3998  return internal::output_port<0>(*this).successor_count();
3999  }
4000 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4001 
4002 protected:
4003 
4005  base_type::reset_node(f);
4006  }
4007 };
4008 
4009 #if __TBB_PREVIEW_STREAMING_NODE
4011 #endif // __TBB_PREVIEW_STREAMING_NODE
4012 
4014 
4015 template< typename T >
4016 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
4017 public:
4018  typedef T input_type;
4019  typedef T output_type;
4022 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4023  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
4024  typedef typename sender<output_type>::built_successors_type built_successors_type;
4025  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
4026  typedef typename sender<output_type>::successor_list_type successor_list_type;
4027 #endif
4028 
4029  __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
4030  my_successors.set_owner( this );
4031  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4032  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4033  }
4034 
4035 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4036  template <typename... Args>
4037  overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
4038  make_edges_in_order(nodes, *this);
4039  }
4040 #endif
4041 
4043  __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) :
4044  graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
4045  {
4046  my_successors.set_owner( this );
4047  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
4048  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
4049  }
4050 
4052 
4053 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4054  void set_name( const char *name ) __TBB_override {
4055  tbb::internal::fgt_node_desc( this, name );
4056  }
4057 #endif
4058 
4059  bool register_successor( successor_type &s ) __TBB_override {
4060  spin_mutex::scoped_lock l( my_mutex );
4061  if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
4062  // We have a valid value that must be forwarded immediately.
4063  bool ret = s.try_put( my_buffer );
4064  if ( ret ) {
4065  // We add the successor that accepted our put
4066  my_successors.register_successor( s );
4067  } else {
4068  // In case of reservation a race between the moment of reservation and register_successor can appear,
4069  // because failed reserve does not mean that register_successor is not ready to put a message immediately.
4070  // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
4071  // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
4072  task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
4073  register_predecessor_task( *this, s );
4075  }
4076  } else {
4077  // No valid value yet, just add as successor
4078  my_successors.register_successor( s );
4079  }
4080  return true;
4081  }
4082 
4083  bool remove_successor( successor_type &s ) __TBB_override {
4084  spin_mutex::scoped_lock l( my_mutex );
4085  my_successors.remove_successor(s);
4086  return true;
4087  }
4088 
4089 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4090  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
4091  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
4092 
4093  void internal_add_built_successor( successor_type &s) __TBB_override {
4094  spin_mutex::scoped_lock l( my_mutex );
4095  my_successors.internal_add_built_successor(s);
4096  }
4097 
4098  void internal_delete_built_successor( successor_type &s) __TBB_override {
4099  spin_mutex::scoped_lock l( my_mutex );
4100  my_successors.internal_delete_built_successor(s);
4101  }
4102 
4103  size_t successor_count() __TBB_override {
4104  spin_mutex::scoped_lock l( my_mutex );
4105  return my_successors.successor_count();
4106  }
4107 
4108  void copy_successors(successor_list_type &v) __TBB_override {
4109  spin_mutex::scoped_lock l( my_mutex );
4110  my_successors.copy_successors(v);
4111  }
4112 
4113  void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
4114  spin_mutex::scoped_lock l( my_mutex );
4115  my_built_predecessors.add_edge(p);
4116  }
4117 
4118  void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
4119  spin_mutex::scoped_lock l( my_mutex );
4120  my_built_predecessors.delete_edge(p);
4121  }
4122 
4123  size_t predecessor_count() __TBB_override {
4124  spin_mutex::scoped_lock l( my_mutex );
4125  return my_built_predecessors.edge_count();
4126  }
4127 
4128  void copy_predecessors( predecessor_list_type &v ) __TBB_override {
4129  spin_mutex::scoped_lock l( my_mutex );
4130  my_built_predecessors.copy_edges(v);
4131  }
4132 
4133  void extract() __TBB_override {
4134  my_buffer_is_valid = false;
4135  built_successors().sender_extract(*this);
4136  built_predecessors().receiver_extract(*this);
4137  }
4138 
4139 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
4140 
4141  bool try_get( input_type &v ) __TBB_override {
4142  spin_mutex::scoped_lock l( my_mutex );
4143  if ( my_buffer_is_valid ) {
4144  v = my_buffer;
4145  return true;
4146  }
4147  return false;
4148  }
4149 
4152  return try_get(v);
4153  }
4154 
4156  bool try_release() __TBB_override { return true; }
4157 
4159  bool try_consume() __TBB_override { return true; }
4160 
4161  bool is_valid() {
4162  spin_mutex::scoped_lock l( my_mutex );
4163  return my_buffer_is_valid;
4164  }
4165 
4166  void clear() {
4167  spin_mutex::scoped_lock l( my_mutex );
4168  my_buffer_is_valid = false;
4169  }
4170 
4171 protected:
4172 
4173  template< typename R, typename B > friend class run_and_put_task;
4174  template<typename X, typename Y> friend class internal::broadcast_cache;
4175  template<typename X, typename Y> friend class internal::round_robin_cache;
4176  task * try_put_task( const input_type &v ) __TBB_override {
4177  spin_mutex::scoped_lock l( my_mutex );
4178  return try_put_task_impl(v);
4179  }
4180 
4181  task * try_put_task_impl(const input_type &v) {
4182  my_buffer = v;
4183  my_buffer_is_valid = true;
4184  task * rtask = my_successors.try_put_task(v);
4185  if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
4186  return rtask;
4187  }
4188 
4190  return my_graph;
4191  }
4192 
4195 
4196  register_predecessor_task(predecessor_type& owner, successor_type& succ) :
4197  o(owner), s(succ) {};
4198 
4200  if (!s.register_predecessor(o)) {
4201  o.register_successor(s);
4202  }
4203  return NULL;
4204  }
4205 
4206  predecessor_type& o;
4207  successor_type& s;
4208  };
4209 
4212 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
4213  internal::edge_container<predecessor_type> my_built_predecessors;
4214 #endif
4215  input_type my_buffer;
4218 
4220  my_buffer_is_valid = false;
4221  if (f&rf_clear_edges) {
4222  my_successors.clear();
4223  }
4224  }
4225 }; // overwrite_node
4226 
4227 template< typename T >
4228 class write_once_node : public overwrite_node<T> {
4229 public:
4230  typedef T input_type;
4231  typedef T output_type;
4235 
4237  __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
4238  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4239  static_cast<receiver<input_type> *>(this),
4240  static_cast<sender<output_type> *>(this) );
4241  }
4242 
4243 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4244  template <typename... Args>
4245  write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
4246  make_edges_in_order(nodes, *this);
4247  }
4248 #endif
4249 
4251  __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
4252  tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
4253  static_cast<receiver<input_type> *>(this),
4254  static_cast<sender<output_type> *>(this) );
4255  }
4256 
4257 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
4258  void set_name( const char *name ) __TBB_override {
4259  tbb::internal::fgt_node_desc( this, name );
4260  }
4261 #endif
4262 
4263 protected:
4264  template< typename R, typename B > friend class run_and_put_task;
4265  template<typename X, typename Y> friend class internal::broadcast_cache;
4266  template<typename X, typename Y> friend class internal::round_robin_cache;
4268  spin_mutex::scoped_lock l( this->my_mutex );
4269  return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
4270  }
4271 };
4272 
4273 } // interfaceX
4274 
4279 
4280  using interface11::graph;
4283 
4302  using namespace interface11::internal::graph_policy_namespace;
4303  using interface11::join_node;
4305  using interface11::copy_body;
4306  using interface11::make_edge;
4309 #if __TBB_FLOW_GRAPH_CPP11_FEATURES
4311 #endif
4313 #if __TBB_PREVIEW_ASYNC_MSG
4314  using interface11::async_msg;
4315 #endif
4316 #if __TBB_PREVIEW_STREAMING_NODE
4317  using interface11::port_ref;
4319 #endif // __TBB_PREVIEW_STREAMING_NODE
4320 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
4322  using internal::no_priority;
4323 #endif
4324 
4325 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
4326  using interface11::internal::follows;
4327  using interface11::internal::precedes;
4328  using interface11::internal::make_node_set;
4329  using interface11::internal::make_edges;
4330 #endif
4331 
4332 } // flow
4333 } // tbb
4334 
4335 // Include deduction guides for node classes
4337 
4338 #undef __TBB_PFG_RESET_ARG
4339 #undef __TBB_COMMA
4340 
4342 #undef __TBB_flow_graph_H_include_area
4343 
4344 #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
4345  #undef __TBB_NOINLINE_SYM
4346 #endif
4347 
4348 #endif // __TBB_flow_graph_H
internal::continue_input< Output, Policy > input_impl_type
Definition: flow_graph.h:1446
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2176
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3266
internal::source_body< output_type > * my_body
Definition: flow_graph.h:1097
bool try_put(const X &t)
Put an item to the receiver.
Definition: flow_graph.h:369
__TBB_NOINLINE_SYM function_node(const function_node &src)
Copy constructor.
Definition: flow_graph.h:1215
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1691
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2292
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1405
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:441
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
Definition: flow_graph.h:1508
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
Definition: flow_graph.h:3020
tbb::task_group_context * my_context
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
Definition: flow_graph.h:3012
interface11::internal::Policy< queueing, lightweight > queueing_lightweight
Definition: flow_graph.h:88
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2379
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls...
Base class for types that should not be assigned.
Definition: tbb_stddef.h:322
void add_task_to_graph_reset_list(tbb::flow::interface10::graph &g, tbb::task *tp)
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2262
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2306
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1570
task * try_put_task_impl(const input_type &v)
Definition: flow_graph.h:4181
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:2135
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2413
internal::port_ref_impl< N1, N2 > port_ref()
Definition: flow_graph.h:42
A generic null type.
Definition: flow_graph.h:103
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
Definition: flow_graph.h:3056
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Definition: flow_graph.h:4199
Implements methods for both executable and function nodes that puts Output to its successors...
Definition: flow_graph.h:854
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3501
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2380
iterator begin()
start iterator
Definition: flow_graph.h:858
__TBB_NOINLINE_SYM write_once_node(graph &g)
Constructor.
Definition: flow_graph.h:4237
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:3835
~graph()
Destroys the graph.
Definition: flow_graph.h:790
virtual void internal_release(buffer_operation *op)
Definition: flow_graph.h:1993
__TBB_DEPRECATED typedef continue_msg input_type
The input type.
Definition: flow_graph.h:595
bool try_get(output_type &v) __TBB_override
Request an item from the node.
Definition: flow_graph.h:996
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:955
#define __TBB_DEPRECATED_LIMITER_ARG2(arg1, arg2)
bool try_consume() __TBB_override
Consumes the reserved item.
Definition: flow_graph.h:4159
__TBB_NOINLINE_SYM multifunction_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:1286
__TBB_NOINLINE_SYM write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
Definition: flow_graph.h:4251
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3147
#define __TBB_NOINLINE_SYM
Definition: flow_graph.h:45
virtual bool try_reserve(T &)
Reserves an item in the sender.
Definition: flow_graph.h:428
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2344
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
Definition: flow_graph.h:2997
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
Definition: flow_graph.h:431
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1411
void prepare_task_arena(bool reinit=false)
void activate()
Activates a node that was created in the inactive state.
Definition: flow_graph.h:1052
__TBB_NOINLINE_SYM broadcast_node(const broadcast_node &src)
Definition: flow_graph.h:1597
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3302
#define CODEPTR()
#define __TBB_DEPRECATED_LIMITER_ARG4(arg1, arg2, arg3, arg4)
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:191
static void fgt_graph(void *)
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
Definition: flow_graph.h:1940
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:716
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1447
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
Definition: flow_graph.h:2825
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
Definition: flow_graph.h:1077
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:4059
bool is_graph_active(tbb::flow::interface10::graph &g)
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
Definition: flow_graph.h:384
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2307
output_ports_type & output_ports()
Definition: flow_graph.h:1397
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3424
bool try_reserve(X &t)
Reserves an item in the sender.
Definition: flow_graph.h:342
static const node_priority_t no_priority
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
Implements methods for an executable node that takes continue_msg as input.
Definition: flow_graph.h:753
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
Definition: flow_graph.h:2157
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2231
virtual bool try_get(T &)
Request an item from the sender.
Definition: flow_graph.h:425
const V & cast_to(T const &t)
Definition: flow_graph.h:715
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
Definition: flow_graph.h:3198
try_put_functor(output_port_type &p, const Output &v)
Definition: flow_graph.h:3847
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3166
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
Definition: flow_graph.h:464
tbb::flow::interface11::graph_iterator< graph, tbb::flow::interface11::graph_node > iterator
__TBB_NOINLINE_SYM async_node(const async_node &other)
Definition: flow_graph.h:3937
receiver< input_type > receiver_type
Definition: flow_graph.h:3833
bool gather_successful_try_puts(const X &t, task_list &tasks)
Definition: flow_graph.h:511
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
task & pop_front()
Pop the front task from the list.
Definition: task.h:1098
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
__TBB_NOINLINE_SYM async_node(graph &g, size_t concurrency,)
Definition: flow_graph.h:3897
bool remove_successor(successor_type &r) __TBB_override
Removes a successor.
Definition: flow_graph.h:2099
virtual bool try_release()
Releases the reserved item.
Definition: flow_graph.h:318
reference operator*() const
Dereference.
Definition: flow_graph.h:747
bool try_reserve_apply_body(output_type &v)
Definition: flow_graph.h:1105
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
Definition: flow_graph.h:3047
bool try_release() __TBB_override
Release a reserved item.
Definition: flow_graph.h:1030
The graph class.
virtual void finalize() const
Definition: flow_graph.h:147
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4267
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1343
pointer operator->() const
Dereference.
Definition: flow_graph.h:753
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:4189
bool register_successor(successor_type &r) __TBB_override
Adds a new successor.
Definition: flow_graph.h:2037
void set_ref_count(int count)
Set reference count.
Definition: task.h:750
An empty class used for messages that mean "I&#39;m done".
Definition: flow_graph.h:106
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
Definition: flow_graph.h:3337
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
Definition: flow_graph.h:1679
untyped_receiver successor_type
The successor type for this node.
Definition: flow_graph.h:303
task * grab_forwarding_task(buffer_operation &op_data)
Definition: flow_graph.h:1823
#define __TBB_CPP11_PRESENT
Definition: tbb_config.h:149
static void fgt_make_edge(void *, void *)
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:2778
__TBB_NOINLINE_SYM continue_node(const continue_node &src)
Copy constructor.
Definition: flow_graph.h:1528
#define __TBB_DEPRECATED
Definition: tbb_config.h:637
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
Definition: task.h:777
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed...
sender< output_type >::successor_type successor_type
The type of successors of this node.
Definition: flow_graph.h:902
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3134
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:1556
__TBB_DEPRECATED typedef receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:598
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
Definition: flow_graph.h:3856
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1169
internal::tagged_msg< size_t, T0, T1, T2 > output_type
Definition: flow_graph.h:3165
static void fgt_composite(void *, void *, void *)
static void fgt_remove_edge(void *, void *)
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:2875
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
Definition: flow_graph.h:2978
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:4151
virtual void internal_reg_succ(buffer_operation *op)
Register successor.
Definition: flow_graph.h:1854
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3867
internal::function_body< T, size_t > * my_sequencer
Definition: flow_graph.h:2300
buffer_node< T, A >::buffer_operation sequencer_operation
Definition: flow_graph.h:2345
__TBB_NOINLINE_SYM split_node(const split_node &other)
Definition: flow_graph.h:1383
static tbb::task *const SUCCESSFULLY_ENQUEUED
An abstract cache of successors.
Definition: flow_graph.h:119
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3246
static void fgt_node_desc(const NodeType *, const char *)
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:2144
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
task * decrement_counter(long long delta)
Definition: flow_graph.h:2691
bool try_get(X &t)
Request an item from the sender.
Definition: flow_graph.h:336
Implements an executable node that supports continue_msg -> Output.
Definition: flow_graph.h:1441
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial...
Definition: flow_graph.h:98
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1165
fOutput_type::successor_type successor_type
Definition: flow_graph.h:1449
tbb::task * root_task()
Returns the root task of the graph.
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
Definition: flow_graph.h:1860
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2604
__TBB_NOINLINE_SYM overwrite_node(const overwrite_node &src)
Copy constructor; doesn&#39;t take anything from src; default won&#39;t work.
Definition: flow_graph.h:4043
void call(F &&f, Pack &&p)
Calls the given function with arguments taken from a stored_pack.
__TBB_NOINLINE_SYM buffer_node(const buffer_node &src)
Copy constructor.
Definition: flow_graph.h:2016
static void fgt_reserve_wait(void *)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
Definition: flow_graph.h:3265
limiter_node(graph &g, __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
Constructor.
Definition: flow_graph.h:2725
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:1013
Detects whether two given types are the same.
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
Definition: flow_graph.h:3879
static void fgt_async_commit(void *, void *)
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:553
static void fgt_begin_body(void *)
void internal_reserve(queue_operation *op) __TBB_override
Definition: flow_graph.h:2244
internal::broadcast_cache< output_type > my_successors
Definition: flow_graph.h:1099
static void fgt_async_try_put_end(void *, void *)
static void fgt_multioutput_node_desc(const NodeType *, const char *)
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:2958
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
Definition: flow_graph.h:3408
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3100
buffer_node< T, A >::size_type size_type
Definition: flow_graph.h:2418
__TBB_NOINLINE_SYM queue_node(graph &g)
Constructor.
Definition: flow_graph.h:2265
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:1708
leaf for multifunction. OutputSet can be a std::tuple or a vector.
Definition: flow_graph.h:203
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
Definition: flow_graph.h:3373
virtual bool try_consume()
Consumes the reserved item.
Definition: flow_graph.h:321
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
void internal_pop(prio_operation *op) __TBB_override
Definition: flow_graph.h:2437
Forwards messages in sequence order.
Definition: flow_graph.h:2299
void reset(tbb::flow::interface11::reset_flags f=tbb::flow::interface11::rf_reset_protocol)
Definition: flow_graph.h:835
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it...
Definition: flow_graph.h:1145
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:1923
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
internal::function_output< output_type > fOutput_type
Definition: flow_graph.h:1167
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
Definition: flow_graph.h:3007
__TBB_NOINLINE_SYM sequencer_node(const sequencer_node &src)
Copy constructor.
Definition: flow_graph.h:2327
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
virtual void handle_operations(buffer_operation *op_list)
Definition: flow_graph.h:1769
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4020
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
Definition: flow_graph.h:3584
virtual bool internal_push(buffer_operation *op)
Definition: flow_graph.h:1964
void internal_consume(prio_operation *op) __TBB_override
Definition: flow_graph.h:2463
__TBB_NOINLINE_SYM join_node(const join_node &other)
Definition: flow_graph.h:2925
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
Definition: flow_graph.h:1366
graph()
Constructs a graph with isolated task_group_context.
Definition: flow_graph.h:766
bool try_consume() __TBB_override
Consumes a reserved item.
Definition: flow_graph.h:1040
__TBB_NOINLINE_SYM priority_queue_node(const priority_queue_node &src)
Copy constructor.
Definition: flow_graph.h:2399
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2191
internal::aggregator< handler_type, buffer_operation > my_aggregator
Definition: flow_graph.h:1767
A lock that occupies a single byte.
Definition: spin_mutex.h:39
void internal_pop(queue_operation *op) __TBB_override
Definition: flow_graph.h:2235
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:690
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
Definition: flow_graph.h:3038
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
Definition: flow_graph.h:798
task * try_put_task(const T &t) __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync s
Pure virtual template class that defines a receiver of messages of type T.
Definition: flow_graph.h:110
__TBB_NOINLINE_SYM split_node(graph &g)
Definition: flow_graph.h:1368
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4234
bool internal_push(prio_operation *op) __TBB_override
Definition: flow_graph.h:2431
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
Implements a function node that supports Input -> Output.
Definition: flow_graph.h:1161
iterator end()
end iterator
Definition: flow_graph.h:860
virtual void internal_pop(buffer_operation *op)
Definition: flow_graph.h:1970
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3280
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:4233
A task that calls a node&#39;s forward_task function.
Definition: flow_graph.h:271
virtual void internal_consume(buffer_operation *op)
Definition: flow_graph.h:1988
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
Definition: flow_graph.h:2909
untyped_sender predecessor_type
The predecessor type for this node.
Definition: flow_graph.h:362
internal::broadcast_cache< output_type > & successors() __TBB_override
Definition: flow_graph.h:1242
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
Definition: flow_graph.h:1510
Implements async node.
Definition: flow_graph.h:3826
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
Definition: flow_graph.h:2942
tbb::flow::interface11::graph_iterator< const graph, const tbb::flow::interface11::graph_node > const_iterator
async_body(const Body &body, gateway_type *gateway)
Definition: flow_graph.h:3804
const_iterator cbegin() const
start const iterator
Definition: flow_graph.h:866
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3232
__TBB_NOINLINE_SYM continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1453
STL namespace.
const_iterator cend() const
end const iterator
Definition: flow_graph.h:868
task that does nothing. Useful for synchronization.
Definition: task.h:1031
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1685
__TBB_NOINLINE_SYM buffer_node(graph &g)
Constructor.
Definition: flow_graph.h:2000
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
Definition: flow_graph.h:1618
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
void handle_operations(prio_operation *op_list) __TBB_override
Definition: flow_graph.h:2427
limiter_node(const limiter_node &src)
Copy constructor.
Definition: flow_graph.h:2744
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
Definition: flow_graph.h:3029
void increment_ref_count()
Atomically increment reference count.
Definition: task.h:760
bool is_continue_receiver() __TBB_override
Definition: flow_graph.h:705
task * try_put_task(const input_type &) __TBB_override
Definition: flow_graph.h:664
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3388
A cache of successors that are put in a round-robin fashion.
Definition: flow_graph.h:121
internal::source_body< output_type > * my_init_body
Definition: flow_graph.h:1098
void operator()(const Input &v, Ports &)
Definition: flow_graph.h:3807
internal::round_robin_cache< T, null_rw_mutex > my_successors
Definition: flow_graph.h:1717
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3410
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
Definition: flow_graph.h:3827
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
Forwards messages in arbitrary order.
Definition: flow_graph.h:1704
Implements methods for a function node that takes a type Input as input.
Definition: flow_graph.h:638
Output output_type
The type of the output message, which is complete.
Definition: flow_graph.h:899
Breaks an infinite loop between the node reservation and register_successor call. ...
Definition: flow_graph.h:4194
graph & graph_reference() const __TBB_override
Definition: flow_graph.h:1412
base_type::size_type size_type
Definition: flow_graph.h:2209
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3180
receiver_type::predecessor_type predecessor_type
Definition: flow_graph.h:3834
split_node: accepts a tuple as input, forwards each element of the tuple to its
Definition: flow_graph.h:1349
__TBB_DEPRECATED bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
Definition: flow_graph.h:616
void spawn_put()
Spawns a task that applies the body.
Definition: flow_graph.h:1137
Base class for tasks generated by graph nodes.
void internal_reserve(prio_operation *op) __TBB_override
Definition: flow_graph.h:2451
__TBB_DEPRECATED bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
Definition: flow_graph.h:626
buffer_node< T, A >::buffer_operation prio_operation
Definition: flow_graph.h:2420
Forward declaration section.
Definition: flow_graph.h:109
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2488
tbb::flow::interface11::graph_node * my_nodes
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:652
An executable node that acts as a source, i.e. it has no predecessors.
Definition: flow_graph.h:896
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1166
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
Definition: flow_graph.h:3861
The base of all graph nodes.
static void fgt_async_try_put_begin(void *, void *)
__TBB_NOINLINE_SYM function_node(graph &g, size_t concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority=tbb::flow::internal::no_priority))
Constructor.
Definition: flow_graph.h:1181
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:4176
__TBB_DEPRECATED continue_receiver(const continue_receiver &src)
Copy constructor.
Definition: flow_graph.h:609
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3213
void deactivate_graph(tbb::flow::interface10::graph &g)
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:1709
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2180
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
Definition: flow_graph.h:1837
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
Definition: flow_graph.h:3409
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:964
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3101
bool empty() const
True if list is empty; false otherwise.
Definition: task.h:1077
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
Definition: flow_graph.h:3959
#define __TBB_override
Definition: tbb_stddef.h:240
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:4217
internal::decrementer< limiter_node< T, DecrementType >, DecrementType > decrement
The internal receiver< DecrementType > that decrements the count.
Definition: flow_graph.h:2716
unsigned int node_priority_t
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
Definition: flow_graph.h:473
static void fgt_graph_desc(void *, const char *)
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
Definition: flow_graph.h:381
void activate_graph(tbb::flow::interface10::graph &g)
bool try_reserve(T &v) __TBB_override
Reserves an item.
Definition: flow_graph.h:2125
buffer_node< T, A >::item_type item_type
Definition: flow_graph.h:2419
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:2881
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
Definition: flow_graph.h:1400
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3374
bool try_release() __TBB_override
Releases the reserved item.
Definition: flow_graph.h:4156
async_body_base< Gateway > base_type
Definition: flow_graph.h:3801
receiver< input_type >::predecessor_type predecessor_type
Definition: flow_graph.h:2261
void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
void register_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:812
static void fgt_release_wait(void *)
K key_from_message(const T &t)
Definition: flow_graph.h:713
function_body that takes an Input and a set of output ports
Definition: flow_graph.h:193
An cache of predecessors that supports requests and reservations.
Definition: flow_graph.h:123
receiver< TupleType > base_type
Definition: flow_graph.h:1351
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:2877
Forwards messages in FIFO order.
Definition: flow_graph.h:2206
Represents acquisition of a mutex.
Definition: spin_mutex.h:53
internal::function_input_queue< input_type, Allocator > input_queue_type
Definition: flow_graph.h:1280
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
Definition: flow_graph.h:3828
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:4083
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4219
base_type::output_ports_type output_ports_type
Definition: flow_graph.h:3838
virtual void internal_reserve(buffer_operation *op)
Definition: flow_graph.h:1979
A cache of successors that are broadcast to.
Definition: flow_graph.h:120
internal::async_body_base< gateway_type > async_body_base_type
Definition: flow_graph.h:3837
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1448
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
Definition: flow_graph.h:2423
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
Definition: flow_graph.h:3964
Forwards messages of type T to all successors.
Definition: flow_graph.h:1565
internal::aggregating_functor< class_type, buffer_operation > handler_type
Definition: flow_graph.h:1765
static void fgt_node(void *, string_index, void *, void *)
__TBB_NOINLINE_SYM overwrite_node(graph &g)
Definition: flow_graph.h:4029
void try_put_and_add_task(task *&last_task)
Definition: flow_graph.h:2220
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
Definition: flow_graph.h:805
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
Definition: flow_graph.h:1282
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
Definition: flow_graph.h:2618
void remove_node(tbb::flow::interface11::graph_node *n)
Definition: flow_graph.h:823
__TBB_NOINLINE_SYM indexer_node(graph &g)
Definition: flow_graph.h:3411
Forwards messages only if the threshold has not been reached.
Definition: flow_graph.h:113
Used to form groups of tasks.
Definition: task.h:347
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3517
__TBB_NOINLINE_SYM queue_node(const queue_node &src)
Copy constructor.
Definition: flow_graph.h:2279
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
implements a function node that supports Input -> (set of outputs)
Definition: flow_graph.h:1260
base_type::buffer_operation queue_operation
Definition: flow_graph.h:2210
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
Definition: flow_graph.h:1279
static void fgt_multiinput_multioutput_node(void *, string_index, void *, void *)
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3114
__TBB_NOINLINE_SYM multifunction_node(const multifunction_node &other)
Definition: flow_graph.h:1322
void handle_operations_impl(buffer_operation *op_list, derived_type *derived)
Definition: flow_graph.h:1774
void internal_consume(queue_operation *op) __TBB_override
Definition: flow_graph.h:2253
static void fgt_async_reserve(void *, void *)
receiver_gateway< output_type > gateway_type
Definition: flow_graph.h:3836
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
Definition: flow_graph.h:4141
internal::broadcast_cache< T > my_successors
Definition: flow_graph.h:2620
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
Definition: flow_graph.h:2848
__TBB_NOINLINE_SYM continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
Definition: flow_graph.h:1490
A task that calls a node&#39;s apply_body_bypass function with no input.
Definition: flow_graph.h:321
void reset_node(reset_flags f) __TBB_override
Definition: flow_graph.h:4004
tbb::internal::uint64_t tag_value
Definition: flow_graph.h:29
__TBB_NOINLINE_SYM priority_queue_node(graph &g, const Compare &comp=Compare())
Constructor.
Definition: flow_graph.h:2383
#define __TBB_DEPRECATED_LIMITER_EXPR(expr)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
Definition: flow_graph.h:3301
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
input_impl_type::predecessor_type predecessor_type
Definition: flow_graph.h:1168
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Definition: flow_graph.h:1996
static void fgt_node_with_body(void *, string_index, void *, void *, void *)
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
Definition: flow_graph.h:1278
bool internal_push(sequencer_operation *op) __TBB_override
Definition: flow_graph.h:2348
item_buffer with reservable front-end. NOTE: if reserving, do not
Definition: flow_graph.h:249
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3199
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
Definition: flow_graph.h:1689
__TBB_NOINLINE_SYM source_node(graph &g, Body body, bool is_active=true)
Constructor for a node with a successor.
Definition: flow_graph.h:914
Forwards messages in priority order.
Definition: flow_graph.h:2373
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
bool try_put(const typename internal::async_helpers< T >::async_type &t)
Definition: flow_graph.h:468
void const char const char int ITT_FORMAT __itt_group_sync p
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:4021
A cache of predecessors that only supports try_get.
Definition: flow_graph.h:122
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3338
A list of children.
Definition: task.h:1063
__TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
Definition: flow_graph.h:3002
internal::tagged_msg< size_t, T0, T1 > output_type
Definition: flow_graph.h:3132
internal::unfolded_indexer_node< InputTuple > unfolded_type
Definition: flow_graph.h:3133
void internal_release(prio_operation *op) __TBB_override
Definition: flow_graph.h:2469
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Definition: flow_graph.h:2837
Base class for user-defined tasks.
Definition: task.h:604
tbb::flow::interface11::graph_node * my_nodes_last
static void fgt_end_body(void *)
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
Definition: flow_graph.h:4211
Enables one or the other code branches.
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
Definition: flow_graph.h:3438
internal::multifunction_output< Output > output_port_type
Definition: flow_graph.h:3842
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Definition: flow_graph.h:4196
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
Definition: flow_graph.h:719
__TBB_NOINLINE_SYM sequencer_node(graph &g, const Sequencer &s)
Constructor.
Definition: flow_graph.h:2311
static task * emit_this(graph &g, const T &t, P &p)
Definition: flow_graph.h:733
void add_nodes_impl(CompositeType *, bool)
Definition: flow_graph.h:958
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
Definition: flow_graph.h:3453
internal::tagged_msg< size_t, T0 > output_type
Definition: flow_graph.h:3099
__TBB_DEPRECATED continue_receiver(__TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority))
Constructor.
Definition: flow_graph.h:601
static const void * to_void_ptr(const T &t)
Definition: flow_graph.h:221
sender< output_type >::successor_type successor_type
Definition: flow_graph.h:2605
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
Definition: flow_graph.h:3231
Implements methods for a function node that takes a type Input as input and sends.
Definition: flow_graph.h:421
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
Definition: flow_graph.h:2114
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
Definition: flow_graph.h:2761
void spawn_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
Base class for receivers of completion messages.
Definition: flow_graph.h:591
virtual task * execute()=0
Should be overridden by derived classes.
__TBB_NOINLINE_SYM indexer_node(const indexer_node &other)
Definition: flow_graph.h:3316

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.