Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
market.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #include "tbb/tbb_stddef.h"
18 #include "tbb/global_control.h" // global_control::active_value
19 
20 #include "market.h"
21 #include "tbb_main.h"
22 #include "governor.h"
23 #include "scheduler.h"
24 #include "itt_notify.h"
25 
26 namespace tbb {
27 namespace internal {
28 
30 #if __TBB_TASK_PRIORITY
31  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
32  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
33 #else /* !__TBB_TASK_PRIORITY */
34  arena_list_type &arenas = my_arenas;
35  arena *&next = my_next_arena;
36 #endif /* !__TBB_TASK_PRIORITY */
37  arenas.push_front( a );
38  if ( arenas.size() == 1 )
39  next = &*arenas.begin();
40 }
41 
43 #if __TBB_TASK_PRIORITY
44  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
45  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
46 #else /* !__TBB_TASK_PRIORITY */
47  arena_list_type &arenas = my_arenas;
48  arena *&next = my_next_arena;
49 #endif /* !__TBB_TASK_PRIORITY */
50  arena_list_type::iterator it = next;
51  __TBB_ASSERT( it != arenas.end(), NULL );
52  if ( next == &a ) {
53  if ( ++it == arenas.end() && arenas.size() > 1 )
54  it = arenas.begin();
55  next = &*it;
56  }
57  arenas.remove( a );
58 }
59 
60 //------------------------------------------------------------------------
61 // market
62 //------------------------------------------------------------------------
63 
64 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size )
65  : my_num_workers_hard_limit(workers_hard_limit)
66  , my_num_workers_soft_limit(workers_soft_limit)
68  , my_global_top_priority(normalized_normal_priority)
69  , my_global_bottom_priority(normalized_normal_priority)
70 #endif /* __TBB_TASK_PRIORITY */
71  , my_ref_count(1)
72  , my_stack_size(stack_size)
73  , my_workers_soft_limit_to_report(workers_soft_limit)
74 {
75 #if __TBB_TASK_PRIORITY
76  __TBB_ASSERT( my_global_reload_epoch == 0, NULL );
77  my_priority_levels[normalized_normal_priority].workers_available = my_num_workers_soft_limit;
78 #endif /* __TBB_TASK_PRIORITY */
79 
80  // Once created RML server will start initializing workers that will need
81  // global market instance to get worker stack size
83  __TBB_ASSERT( my_server, "Failed to create RML server" );
84 }
85 
86 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
87  if( int soft_limit = market::app_parallelism_limit() )
88  workers_soft_limit = soft_limit-1;
89  else // if user set no limits (yet), use market's parameter
90  workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
91  if( workers_soft_limit >= workers_hard_limit )
92  workers_soft_limit = workers_hard_limit-1;
93  return workers_soft_limit;
94 }
95 
96 market& market::global_market ( bool is_public, unsigned workers_requested, size_t stack_size ) {
97  global_market_mutex_type::scoped_lock lock( theMarketMutex );
98  market *m = theMarket;
99  if( m ) {
100  ++m->my_ref_count;
101  const unsigned old_public_count = is_public? m->my_public_ref_count++ : /*any non-zero value*/1;
102  lock.release();
103  if( old_public_count==0 )
105 
106  // do not warn if default number of workers is requested
107  if( workers_requested != governor::default_num_threads()-1 ) {
108  __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
109  "skip_soft_limit_warning must be larger than any valid workers_requested" );
110  unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report;
111  if( soft_limit_to_report < workers_requested ) {
112  runtime_warning( "The number of workers is currently limited to %u. "
113  "The request for %u workers is ignored. Further requests for more workers "
114  "will be silently ignored until the limit changes.\n",
115  soft_limit_to_report, workers_requested );
116  // The race is possible when multiple threads report warnings.
117  // We are OK with that, as there are just multiple warnings.
119  compare_and_swap(skip_soft_limit_warning, soft_limit_to_report);
120  }
121 
122  }
123  if( m->my_stack_size < stack_size )
124  runtime_warning( "Thread stack size has been already set to %u. "
125  "The request for larger stack (%u) cannot be satisfied.\n",
126  m->my_stack_size, stack_size );
127  }
128  else {
129  // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
130  if( stack_size == 0 )
132  // Expecting that 4P is suitable for most applications.
133  // Limit to 2P for large thread number.
134  // TODO: ask RML for max concurrency and possibly correct hard_limit
135  const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
136  // The requested number of threads is intentionally not considered in
137  // computation of the hard limit, in order to separate responsibilities
138  // and avoid complicated interactions between global_control and task_scheduler_init.
139  // The market guarantees that at least 256 threads might be created.
140  const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
141  const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
142  // Create the global market instance
143  size_t size = sizeof(market);
144 #if __TBB_TASK_GROUP_CONTEXT
145  __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(generic_scheduler*) == sizeof(market),
146  "my_workers must be the last data field of the market class");
147  size += sizeof(generic_scheduler*) * (workers_hard_limit - 1);
148 #endif /* __TBB_TASK_GROUP_CONTEXT */
150  void* storage = NFS_Allocate(1, size, NULL);
151  memset( storage, 0, size );
152  // Initialize and publish global market
153  m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
154  if( is_public )
155  m->my_public_ref_count = 1;
156  theMarket = m;
157  // This check relies on the fact that for shared RML default_concurrency==max_concurrency
158  if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
159  runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
160  , m->my_server->default_concurrency(), workers_soft_limit );
161  }
162  return *m;
163 }
164 
166 #if __TBB_COUNT_TASK_NODES
167  if ( my_task_node_count )
168  runtime_warning( "Leaked %ld task objects\n", (long)my_task_node_count );
169 #endif /* __TBB_COUNT_TASK_NODES */
170  this->market::~market(); // qualified to suppress warning
171  NFS_Free( this );
173 }
174 
175 bool market::release ( bool is_public, bool blocking_terminate ) {
176  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
177  bool do_release = false;
178  {
179  global_market_mutex_type::scoped_lock lock( theMarketMutex );
180  if ( blocking_terminate ) {
181  __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
182  while ( my_public_ref_count == 1 && my_ref_count > 1 ) {
183  lock.release();
184  // To guarantee that request_close_connection() is called by the last master, we need to wait till all
185  // references are released. Re-read my_public_ref_count to limit waiting if new masters are created.
186  // Theoretically, new private references to the market can be added during waiting making it potentially
187  // endless.
188  // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
189  // Note that the market should know about its schedulers for cancellation/exception/priority propagation,
190  // see e.g. task_group_context::cancel_group_execution()
192  __TBB_Yield();
193  lock.acquire( theMarketMutex );
194  }
195  }
196  if ( is_public ) {
197  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
200  }
201  if ( --my_ref_count == 0 ) {
203  do_release = true;
204  theMarket = NULL;
205  }
206  }
207  if( do_release ) {
208  __TBB_ASSERT( !__TBB_load_with_acquire(my_public_ref_count), "No public references remain if we remove the market." );
209  // inform RML that blocking termination is required
210  my_join_workers = blocking_terminate;
211  my_server->request_close_connection();
212  return blocking_terminate;
213  }
214  return false;
215 }
216 
217 int market::update_workers_request() {
218  int old_request = my_num_workers_requested;
220 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
221  if (my_mandatory_num_requested > 0) {
222  __TBB_ASSERT(my_num_workers_soft_limit == 0, NULL);
224  }
225 #endif
226 #if __TBB_TASK_PRIORITY
227  my_priority_levels[my_global_top_priority].workers_available = my_num_workers_requested;
228  update_allotment(my_global_top_priority);
229 #else
231 #endif
232  return my_num_workers_requested - old_request;
233 }
234 
235 void market::set_active_num_workers ( unsigned soft_limit ) {
236  market *m;
237 
238  {
239  global_market_mutex_type::scoped_lock lock( theMarketMutex );
240  if ( !theMarket )
241  return; // actual value will be used at market creation
242  m = theMarket;
243  if (m->my_num_workers_soft_limit == soft_limit)
244  return;
245  ++m->my_ref_count;
246  }
247  // have my_ref_count for market, use it safely
248 
249  int delta = 0;
250  {
252  __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
253 
254 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
255 #if __TBB_TASK_PRIORITY
256 #define FOR_EACH_PRIORITY_LEVEL_BEGIN { \
257  for (int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p) { \
258  priority_level_info& pl = m->my_priority_levels[p]; \
259  arena_list_type& arenas = pl.arenas;
260 #else
261 #define FOR_EACH_PRIORITY_LEVEL_BEGIN { { \
262  const int p = 0; \
263  arena_list_type& arenas = m->my_arenas;
264 #endif
265 #define FOR_EACH_PRIORITY_LEVEL_END } }
266 
267  if (m->my_num_workers_soft_limit == 0 && m->my_mandatory_num_requested > 0) {
268  FOR_EACH_PRIORITY_LEVEL_BEGIN
269  for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it)
270  if (it->my_global_concurrency_mode)
271  m->disable_mandatory_concurrency_impl(&*it);
272  FOR_EACH_PRIORITY_LEVEL_END
273  }
274  __TBB_ASSERT(m->my_mandatory_num_requested == 0, NULL);
275 #endif
276 
277  as_atomic(m->my_num_workers_soft_limit) = soft_limit;
278  // report only once after new soft limit value is set
279  m->my_workers_soft_limit_to_report = soft_limit;
280 
281 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
282  if (m->my_num_workers_soft_limit == 0) {
283  FOR_EACH_PRIORITY_LEVEL_BEGIN
284  for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it) {
285  if (!it->my_task_stream.empty(p))
286  m->enable_mandatory_concurrency_impl(&*it);
287  }
288  FOR_EACH_PRIORITY_LEVEL_END
289  }
290 #undef FOR_EACH_PRIORITY_LEVEL_BEGIN
291 #undef FOR_EACH_PRIORITY_LEVEL_END
292 #endif
293 
294  delta = m->update_workers_request();
295  }
296  // adjust_job_count_estimate must be called outside of any locks
297  if( delta!=0 )
298  m->my_server->adjust_job_count_estimate( delta );
299  // release internal market reference to match ++m->my_ref_count above
300  m->release( /*is_public=*/false, /*blocking_terminate=*/false );
301 }
302 
303 bool governor::does_client_join_workers (const tbb::internal::rml::tbb_client &client) {
304  return ((const market&)client).must_join_workers();
305 }
306 
307 arena* market::create_arena ( int num_slots, int num_reserved_slots, size_t stack_size ) {
308  __TBB_ASSERT( num_slots > 0, NULL );
309  __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
310  // Add public market reference for master thread/task_arena (that adds an internal reference in exchange).
311  market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
312 
313  arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots );
314  // Add newly created arena into the existing market's list.
317  return &a;
318 }
319 
322  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
323  __TBB_ASSERT( !a.my_slots[0].my_scheduler, NULL );
324  if (a.my_global_concurrency_mode)
325  disable_mandatory_concurrency_impl(&a);
326 
330 }
331 
332 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch ) {
333  bool locked = true;
334  __TBB_ASSERT( a, NULL );
335  // we hold reference to the market, so it cannot be destroyed at any moment here
336  __TBB_ASSERT( this == theMarket, NULL );
337  __TBB_ASSERT( my_ref_count!=0, NULL );
340 #if __TBB_TASK_PRIORITY
341  // scan all priority levels, not only in [my_global_bottom_priority;my_global_top_priority]
342  // range, because arena to be destroyed can have no outstanding request for workers
343  for ( int p = num_priority_levels-1; p >= 0; --p ) {
344  priority_level_info &pl = my_priority_levels[p];
345  arena_list_type &my_arenas = pl.arenas;
346 #endif /* __TBB_TASK_PRIORITY */
347  arena_list_type::iterator it = my_arenas.begin();
348  for ( ; it != my_arenas.end(); ++it ) {
349  if ( a == &*it ) {
350  if ( it->my_aba_epoch == aba_epoch ) {
351  // Arena is alive
352  if ( !a->my_num_workers_requested && !a->my_references ) {
353  __TBB_ASSERT( !a->my_num_workers_allotted && (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), "Inconsistent arena state" );
354  // Arena is abandoned. Destroy it.
355  detach_arena( *a );
357  locked = false;
358  a->free_arena();
359  }
360  }
361  if (locked)
363  return;
364  }
365  }
366 #if __TBB_TASK_PRIORITY
367  }
368 #endif /* __TBB_TASK_PRIORITY */
370 }
371 
374  if ( arenas.empty() )
375  return NULL;
376  arena_list_type::iterator it = hint;
377  __TBB_ASSERT( it != arenas.end(), NULL );
378  do {
379  arena& a = *it;
380  if ( ++it == arenas.end() )
381  it = arenas.begin();
384  return &a;
385  }
386  } while ( it != hint );
387  return NULL;
388 }
389 
390 int market::update_allotment ( arena_list_type& arenas, int workers_demand, int max_workers ) {
391  __TBB_ASSERT( workers_demand > 0, NULL );
392  max_workers = min(workers_demand, max_workers);
393  int assigned = 0;
394  int carry = 0;
395  for (arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it) {
396  arena& a = *it;
397  if (a.my_num_workers_requested <= 0) {
399  continue;
400  }
401  int allotted = 0;
402 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
403  if (my_num_workers_soft_limit == 0) {
404  __TBB_ASSERT(max_workers == 0 || max_workers == 1, NULL);
405  allotted = a.my_global_concurrency_mode && assigned < max_workers ? 1 : 0;
406  } else
407 #endif
408  {
409  int tmp = a.my_num_workers_requested * max_workers + carry;
410  allotted = tmp / workers_demand;
411  carry = tmp % workers_demand;
412  // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers
413  allotted = min(allotted, (int)a.my_max_num_workers);
414  }
415  a.my_num_workers_allotted = allotted;
416  assigned += allotted;
417  }
418  __TBB_ASSERT( 0 <= assigned && assigned <= max_workers, NULL );
419  return assigned;
420 }
421 
424  if ( a ) {
425  for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
426  if ( a == &*it )
427  return true;
428  }
429  return false;
430 }
431 
432 #if __TBB_TASK_PRIORITY
433 inline void market::update_global_top_priority ( intptr_t newPriority ) {
434  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.market_prio_switches );
435  my_global_top_priority = newPriority;
436  my_priority_levels[newPriority].workers_available =
437 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
438  my_mandatory_num_requested && !my_num_workers_soft_limit ? 1 :
439 #endif
441  advance_global_reload_epoch();
442 }
443 
444 inline void market::reset_global_priority () {
445  my_global_bottom_priority = normalized_normal_priority;
446  update_global_top_priority(normalized_normal_priority);
447 }
448 
449 arena* market::arena_in_need ( arena* prev_arena ) {
450  if( as_atomic(my_total_demand) <= 0 )
451  return NULL;
454  int p = my_global_top_priority;
455  arena *a = NULL;
456 
457  // Checks if arena is alive or not
458  if ( is_arena_in_list( my_priority_levels[p].arenas, prev_arena ) ) {
459  a = arena_in_need( my_priority_levels[p].arenas, prev_arena );
460  }
461 
462  while ( !a && p >= my_global_bottom_priority ) {
463  priority_level_info &pl = my_priority_levels[p--];
464  a = arena_in_need( pl.arenas, pl.next_arena );
465  if ( a ) {
466  as_atomic(pl.next_arena) = a; // a subject for innocent data race under the reader lock
467  // TODO: rework global round robin policy to local or random to avoid this write
468  }
469  // TODO: When refactoring task priority code, take into consideration the
470  // __TBB_TRACK_PRIORITY_LEVEL_SATURATION sections from earlier versions of TBB
471  }
472  return a;
473 }
474 
475 void market::update_allotment ( intptr_t highest_affected_priority ) {
476  intptr_t i = highest_affected_priority;
477  int available = my_priority_levels[i].workers_available;
478  for ( ; i >= my_global_bottom_priority; --i ) {
479  priority_level_info &pl = my_priority_levels[i];
480  pl.workers_available = available;
481  if ( pl.workers_requested ) {
482  available -= update_allotment( pl.arenas, pl.workers_requested, available );
483  if ( available <= 0 ) { // TODO: assertion?
484  available = 0;
485  break;
486  }
487  }
488  }
489  __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
490  for ( --i; i >= my_global_bottom_priority; --i ) {
491  priority_level_info &pl = my_priority_levels[i];
492  pl.workers_available = 0;
493  arena_list_type::iterator it = pl.arenas.begin();
494  for ( ; it != pl.arenas.end(); ++it ) {
495  __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
496  it->my_num_workers_allotted = 0;
497  }
498  }
499 }
500 #endif /* __TBB_TASK_PRIORITY */
501 
502 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
503 void market::enable_mandatory_concurrency_impl ( arena *a ) {
504  __TBB_ASSERT(!a->my_global_concurrency_mode, NULL);
506 
507  a->my_global_concurrency_mode = true;
508  my_mandatory_num_requested++;
509 }
510 
511 void market::enable_mandatory_concurrency ( arena *a ) {
512  int delta = 0;
513  {
515  if (my_num_workers_soft_limit != 0 || a->my_global_concurrency_mode)
516  return;
517 
518  enable_mandatory_concurrency_impl(a);
519  delta = update_workers_request();
520  }
521 
522  if (delta != 0)
523  my_server->adjust_job_count_estimate(delta);
524 }
525 
526 void market::disable_mandatory_concurrency_impl(arena* a) {
527  __TBB_ASSERT(a->my_global_concurrency_mode, NULL);
528  __TBB_ASSERT(my_mandatory_num_requested > 0, NULL);
529 
530  a->my_global_concurrency_mode = false;
531  my_mandatory_num_requested--;
532 }
533 
534 void market::mandatory_concurrency_disable ( arena *a ) {
535  int delta = 0;
536  {
538  if (!a->my_global_concurrency_mode)
539  return;
540  // There is a racy window in advertise_new_work between mandtory concurrency enabling and
541  // setting SNAPSHOT_FULL. It gives a chance to spawn request to disable mandatory concurrency.
542  // Therefore, we double check that there is no enqueued tasks.
543  if (a->has_enqueued_tasks())
544  return;
545 
547  disable_mandatory_concurrency_impl(a);
548 
549  delta = update_workers_request();
550  }
551  if (delta != 0)
552  my_server->adjust_job_count_estimate(delta);
553 }
554 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
555 
556 void market::adjust_demand ( arena& a, int delta ) {
557  __TBB_ASSERT( theMarket, "market instance was destroyed prematurely?" );
558  if ( !delta )
559  return;
561  int prev_req = a.my_num_workers_requested;
562  a.my_num_workers_requested += delta;
563  if ( a.my_num_workers_requested <= 0 ) {
565  if ( prev_req <= 0 ) {
567  return;
568  }
569  delta = -prev_req;
570  }
571  else if ( prev_req < 0 ) {
572  delta = a.my_num_workers_requested;
573  }
574  my_total_demand += delta;
575 #if !__TBB_TASK_PRIORITY
577 #else /* !__TBB_TASK_PRIORITY */
578  intptr_t p = a.my_top_priority;
579  priority_level_info &pl = my_priority_levels[p];
580  pl.workers_requested += delta;
581  __TBB_ASSERT( pl.workers_requested >= 0, NULL );
582  if ( a.my_num_workers_requested <= 0 ) {
583  if ( a.my_top_priority != normalized_normal_priority ) {
584  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_resets );
585  update_arena_top_priority( a, normalized_normal_priority );
586  }
587  a.my_bottom_priority = normalized_normal_priority;
588  }
589  unsigned effective_soft_limit = my_num_workers_soft_limit;
590  if (my_mandatory_num_requested > 0) {
591  __TBB_ASSERT(effective_soft_limit == 0, NULL);
592  effective_soft_limit = 1;
593 
594  }
595  if ( p == my_global_top_priority ) {
596  if ( !pl.workers_requested ) {
597  while ( --p >= my_global_bottom_priority && !my_priority_levels[p].workers_requested )
598  continue;
599  if ( p < my_global_bottom_priority )
600  reset_global_priority();
601  else
602  update_global_top_priority(p);
603  }
604  my_priority_levels[my_global_top_priority].workers_available = effective_soft_limit;
605  update_allotment( my_global_top_priority );
606  }
607  else if ( p > my_global_top_priority ) {
608  __TBB_ASSERT( pl.workers_requested > 0, NULL );
609  // TODO: investigate if the following invariant is always valid
610  __TBB_ASSERT( a.my_num_workers_requested >= 0, NULL );
611  update_global_top_priority(p);
612  a.my_num_workers_allotted = min( (int)effective_soft_limit, a.my_num_workers_requested );
613  my_priority_levels[p - 1].workers_available = effective_soft_limit - a.my_num_workers_allotted;
614  update_allotment( p - 1 );
615  }
616  else if ( p == my_global_bottom_priority ) {
617  if ( !pl.workers_requested ) {
618  while ( ++p <= my_global_top_priority && !my_priority_levels[p].workers_requested )
619  continue;
620  if ( p > my_global_top_priority )
621  reset_global_priority();
622  else
623  my_global_bottom_priority = p;
624  }
625  else
626  update_allotment( p );
627  }
628  else if ( p < my_global_bottom_priority ) {
629  int prev_bottom = my_global_bottom_priority;
630  my_global_bottom_priority = p;
631  update_allotment( prev_bottom );
632  }
633  else {
634  __TBB_ASSERT( my_global_bottom_priority < p && p < my_global_top_priority, NULL );
635  update_allotment( p );
636  }
637  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested<=0, NULL );
639 #endif /* !__TBB_TASK_PRIORITY */
640  if ( delta > 0 ) {
641  // can't overflow soft_limit, but remember values request by arenas in
642  // my_total_demand to not prematurely release workers to RML
643  if ( my_num_workers_requested+delta > (int)effective_soft_limit)
644  delta = effective_soft_limit - my_num_workers_requested;
645  } else {
646  // the number of workers should not be decreased below my_total_demand
648  delta = min(my_total_demand, (int)effective_soft_limit) - my_num_workers_requested;
649  }
650  my_num_workers_requested += delta;
651  __TBB_ASSERT( my_num_workers_requested <= (int)effective_soft_limit, NULL );
652 
654  // Must be called outside of any locks
655  my_server->adjust_job_count_estimate( delta );
657 }
658 
659 void market::process( job& j ) {
660  generic_scheduler& s = static_cast<generic_scheduler&>(j);
661  // s.my_arena can be dead. Don't access it until arena_in_need is called
662  arena *a = s.my_arena;
663  __TBB_ASSERT( governor::is_set(&s), NULL );
664 
665  for (int i = 0; i < 2; ++i) {
666  while ( (a = arena_in_need(a)) ) {
667  a->process(s);
668  a = NULL; // to avoid double checks in arena_in_need(arena*) for the same priority level
669  }
670  // Workers leave market because there is no arena in need. It can happen earlier than
671  // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
672  // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
673  // the yield refines this spinning.
674  if ( !i )
675  __TBB_Yield();
676  }
677 
678  GATHER_STATISTIC( ++s.my_counters.market_roundtrips );
679 }
680 
681 void market::cleanup( job& j ) {
682  __TBB_ASSERT( theMarket != this, NULL );
683  generic_scheduler& s = static_cast<generic_scheduler&>(j);
685  __TBB_ASSERT( !mine || mine->is_worker(), NULL );
686  if( mine!=&s ) {
688  generic_scheduler::cleanup_worker( &s, mine!=NULL );
690  } else {
692  }
693 }
694 
696  destroy();
697 }
698 
699 ::rml::job* market::create_one_job() {
700  unsigned index = ++my_first_unused_worker_idx;
701  __TBB_ASSERT( index > 0, NULL );
702  ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
703  // index serves as a hint decreasing conflicts between workers when they migrate between arenas
704  generic_scheduler* s = generic_scheduler::create_worker( *this, index, /* genuine = */ true );
705 #if __TBB_TASK_GROUP_CONTEXT
706  __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
707  __TBB_ASSERT( !my_workers[index - 1], NULL );
708  my_workers[index - 1] = s;
709 #endif /* __TBB_TASK_GROUP_CONTEXT */
710  return s;
711 }
712 
713 #if __TBB_TASK_PRIORITY
714 void market::update_arena_top_priority ( arena& a, intptr_t new_priority ) {
715  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_switches );
716  __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
717  priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
718  &new_level = my_priority_levels[new_priority];
720  a.my_top_priority = new_priority;
722  as_atomic( a.my_reload_epoch ).fetch_and_increment<tbb::release>(); // TODO: synch with global reload epoch in order to optimize usage of local reload epoch
723  prev_level.workers_requested -= a.my_num_workers_requested;
724  new_level.workers_requested += a.my_num_workers_requested;
725  __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
726 }
727 
728 bool market::lower_arena_priority ( arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
729  // TODO: replace the lock with a try_lock loop which performs a double check of the epoch
731  if ( a.my_reload_epoch != old_reload_epoch ) {
733  return false;
734  }
735  __TBB_ASSERT( a.my_top_priority > new_priority, NULL );
736  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
737 
738  intptr_t p = a.my_top_priority;
739  update_arena_top_priority( a, new_priority );
740  if ( a.my_num_workers_requested > 0 ) {
741  if ( my_global_bottom_priority > new_priority ) {
742  my_global_bottom_priority = new_priority;
743  }
744  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
745  // Global top level became empty
746  for ( --p; p>my_global_bottom_priority && !my_priority_levels[p].workers_requested; --p ) continue;
747  update_global_top_priority(p);
748  }
749  update_allotment( p );
750  }
751 
752  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
754  return true;
755 }
756 
757 bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
758  // TODO: do not acquire this global lock while checking arena's state.
760 
761  tbb::internal::assert_priority_valid(new_priority);
762  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
764  if ( a.my_top_priority == new_priority ) {
765  return false;
766  }
767  else if ( a.my_top_priority > new_priority ) {
768  if ( a.my_bottom_priority > new_priority )
769  a.my_bottom_priority = new_priority;
770  return false;
771  }
772  else if ( a.my_num_workers_requested <= 0 ) {
773  return false;
774  }
775 
776  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
777 
778  intptr_t p = a.my_top_priority;
779  intptr_t highest_affected_level = max(p, new_priority);
780  update_arena_top_priority( a, new_priority );
781 
782  if ( my_global_top_priority < new_priority ) {
783  update_global_top_priority(new_priority);
784  }
785  else if ( my_global_top_priority == new_priority ) {
786  advance_global_reload_epoch();
787  }
788  else {
789  __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
790  __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
791  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
792  // Global top level became empty
793  __TBB_ASSERT( my_global_bottom_priority < p, NULL );
794  for ( --p; !my_priority_levels[p].workers_requested; --p ) continue;
795  __TBB_ASSERT( p >= new_priority, NULL );
796  update_global_top_priority(p);
797  highest_affected_level = p;
798  }
799  }
800  if ( p == my_global_bottom_priority ) {
801  // Arena priority was increased from the global bottom level.
802  __TBB_ASSERT( p < new_priority, NULL );
803  __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
804  while ( my_global_bottom_priority < my_global_top_priority
805  && !my_priority_levels[my_global_bottom_priority].workers_requested )
806  ++my_global_bottom_priority;
807  __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
808  __TBB_ASSERT( my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
809  }
810  update_allotment( highest_affected_level );
811 
812  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
814  return true;
815 }
816 #endif /* __TBB_TASK_PRIORITY */
817 
818 } // namespace internal
819 } // namespace tbb
void cleanup(job &j) __TBB_override
Definition: market.cpp:681
arena_list_type my_arenas
List of registered arenas.
Definition: market.h:135
uintptr_t my_aba_epoch
ABA prevention marker.
Definition: arena.h:231
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
Definition: market.h:139
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
Definition: market.cpp:64
void assert_market_valid() const
Definition: market.h:227
void adjust_demand(arena &, int delta)
Request that arena&#39;s need in workers should be adjusted.
Definition: market.cpp:556
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
Definition: arena.h:191
static rml::tbb_server * create_rml_server(rml::tbb_client &)
Definition: governor.cpp:92
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:139
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
atomic< T > & as_atomic(T &t)
Definition: atomic.h:572
bool my_join_workers
Shutdown mode.
Definition: market.h:155
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
Definition: arena.h:181
Release.
Definition: atomic.h:59
unsigned my_ref_count
Reference count controlling market object lifetime.
Definition: market.h:146
void update_allotment()
Recalculates the number of workers assigned to each arena in the list.
Definition: market.h:214
job * create_one_job() __TBB_override
Definition: market.cpp:699
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
Definition: market.h:86
arena_slot my_slots[1]
Definition: arena.h:386
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
Definition: market.h:70
void unlock()
Release lock.
The graph class.
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:303
void process(job &j) __TBB_override
Definition: market.cpp:659
unsigned num_workers_active() const
The number of workers active in the arena.
Definition: arena.h:330
bool is_worker() const
True if running on a worker thread, false otherwise.
Definition: scheduler.h:673
arena * arena_in_need(arena *)
Returns next arena that needs more workers, or NULL.
Definition: market.h:221
static unsigned default_num_threads()
Definition: governor.h:84
unsigned my_public_ref_count
Count of master threads attached.
Definition: market.h:149
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
Definition: scheduler.cpp:1327
The scoped locking pattern.
Definition: spin_rw_mutex.h:86
size_t my_stack_size
Stack size of worker threads.
Definition: market.h:152
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:120
static const unsigned ref_worker
Definition: arena.h:324
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market&#39;s list.
Definition: market.cpp:332
static size_t active_value(parameter p)
arena * my_arena
The arena that I own (if master) or am servicing at the moment (if worker)
Definition: scheduler.h:85
int my_total_demand
Number of workers that were requested by all arenas.
Definition: market.h:89
void remove_arena_from_list(arena &a)
Definition: market.cpp:42
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
Definition: market.h:143
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
Definition: tbb_misc.h:115
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:712
void destroy()
Destroys and deallocates market object created by market::create()
Definition: market.cpp:165
#define __TBB_Yield()
Definition: ibm_aix51.h:44
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
Definition: market.h:74
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
Definition: tbb_main.cpp:122
void const char const char int ITT_FORMAT __itt_group_sync s
#define ITT_THREAD_SET_NAME(name)
Definition: itt_notify.h:117
int my_num_workers_requested
Number of workers currently requested from RML.
Definition: market.h:81
#define __TBB_TASK_PRIORITY
Definition: tbb_config.h:572
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
Definition: itt_notify.h:62
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
Definition: tbb_main.cpp:117
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:266
bool release(bool is_public, bool blocking_terminate)
Decrements market&#39;s refcount and destroys it in the end.
Definition: market.cpp:175
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
Definition: arena.cpp:102
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas, and cancellation propagation.
Definition: market.h:63
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
Definition: market.h:158
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
Definition: arena.h:314
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
Definition: market.cpp:96
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
Definition: tbb_misc.h:124
static market * theMarket
Currently active global market.
Definition: market.h:58
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
Definition: market.h:161
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers()) ...
Definition: market.h:78
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
Definition: governor.cpp:116
static void set_active_num_workers(unsigned w)
Set number of active workers.
Definition: market.cpp:235
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
Definition: arena.cpp:241
void acknowledge_close_connection() __TBB_override
Definition: market.cpp:695
arenas_list_mutex_type my_arenas_list_mutex
Definition: market.h:67
static generic_scheduler * create_worker(market &m, size_t index, bool geniune)
Initialize a scheduler for a worker thread.
Definition: scheduler.cpp:1269
static bool UsePrivateRML
Definition: governor.h:64
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
static unsigned app_parallelism_limit()
Reports active parallelism level according to user&#39;s settings.
Definition: tbb_main.cpp:513
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena...
Definition: arena.h:143
void detach_arena(arena &)
Removes the arena from the market&#39;s list.
Definition: market.cpp:321
void lock()
Acquire writer lock.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
Definition: market.cpp:86
void free_arena()
Completes arena shutdown, destructs and deallocates it.
Definition: arena.cpp:252
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
Definition: market.cpp:307
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
Definition: arena.h:184
void const char const char int ITT_FORMAT __itt_group_sync p
bool is_arena_in_list(arena_list_type &arenas, arena *a)
Definition: market.cpp:423
void insert_arena_into_list(arena &a)
Definition: market.cpp:29
static const intptr_t num_priority_levels
Work stealing task scheduler.
Definition: scheduler.h:137
#define GATHER_STATISTIC(x)
atomic< unsigned > my_references
Reference counter for the arena.
Definition: arena.h:149
bool has_enqueued_tasks()
Check for the presence of enqueued tasks at all priority levels.
Definition: arena.cpp:382
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.