The Massiv provides platform independent interfaces to access threading and synchronization facilities. Of course they have been implemented primarily to be used by the Core but the API was also exposed to applications. However since the Core (and thus distributed applications) are single threaded worker threads spawned by an application must not interact with the Core nor the application at all. Nonetheless they can be used by UI and sound subsystems in the Massiv clients if standard polling mechanism via hooking next tick callbacks (see Section 16.2, “System Interface”) is not sufficient. The API is described thoroughly in the Massiv Core Programmer's Reference and will not be discussed here in detail.
![]() | Warning |
---|---|
The thread and synchronization API can be used locally only. It is not available on the "object level" and can not be used by the distributed application (i.e. locking local resources from remote nodes). |
The API to the thread subsystem is in general similar to what you would find in common operating systems. It is accessible through Thread object instances and a ThreadManager global object. While the Thread objects return information on the corresponding threads (its static methods return information on the current thread), ThreadManager provides access to the whole threading subsystem and allows to launch new threads. When a new thread is launched corresponding Thread object is created.
To synchronize self with different threads synchronization primitives are implemented too. These are CriticalSection and Semaphore.
Critical sections implement simple guarded sections for mutual exclusion of involved threads (only one thread can enter the section, other threads are not allowed to step in unless the first thread has already left the section). To enter the section call enter() method. If an another thread has already entered the section the calling thread will be blocked until the other thread leaves it (leave()).
Semaphore is an another synchronization primitive. A positive integral counter is associated with each semaphore instance that basically denotes how many threads can enter the guarded section. Initially the semaphore value is set to the value specified at semaphore construction. Semaphores are accompanied by atomic operations that either increment (up()) or decrement (down()) the semaphore's value. If the value can not be decremented (as it is already 0) the calling thread is blocked in down() until an another thread increments it. Usually the value is decremented when the guarded section is entered and incremented when the section is left.
Both CriticalSection and Semaphore instances are created and managed by corresponding managers (CriticalSectionManager, SemaphoreManager), accessible as global objects.
The following example shows basic usage of the threading and synchronization subsystems. However there is no need to comment on the use of the interfaces as it is obvious and the exact description could be found in the Massiv Core Programmer's Reference if needed.
struct Item { ... }; void produce( Item & item ); void consume( Item & item ); int producer( VariantPointer arg ); int consumer( VariantPointer arg ); const int max_count = 10; Semaphore * free_count = Global::semaphore_manager().create( max_count ); Semaphore * full_count = Global::semaphore_manager().create( 0 ); CriticalSection * section = Global::critical_section().create(); int count = 0; Item queue[ max_count ]; bool quit; Reference< Thread > p_thread = Global::thread_manager().create( producer, &quit ); Reference< Thread > c_thread = Global::thread_manager().create( consumer, &quit ); int producer( VariantPointer arg ) { bool & quit = *variant_cast< bool * >( arg ); int num_items_produced = 0; while( true ) { free_count->down(); { section->enter(); if( quit ) { section->leave(); return num_items_produced; } produce( queue[ count++ ] ); num_items_produced++; section->leave(); } full_count->up(); } } int consumer( VariantPointer arg ) { bool & quit = *variant_cast< bool * >( arg ); int num_items_consumed = 0; while( true ) { full_count->down(); { section->enter(); if( quit ) { section->leave(); return num_items_consumed; } consume( queue[ --count ] ); num_items_consumed++; section->leave(); } free_count->up(); } } void stop() { section->enter(); quit = true; section->false(); /* Stop producer. */ free_count->up(); std::cout << "num_items_produced = " << p_thread->join() << '\n'; /* Stop consumer. */ full_count->up(); std::cout << "num_items_consumed = " << c_thread->join() << '\n'; } |