1 module dawait; 2 3 import std.parallelism; 4 import std.container; 5 import core.thread.fiber; 6 import core.sync.semaphore; 7 8 version(unittest) import fluent.asserts; 9 10 private SList!Fiber fibersQueued = SList!Fiber(); 11 private size_t globalWaitingOnThreads = 0; 12 private __gshared Semaphore globalSync; 13 14 /** 15 Creates an async task. 16 An async task is a task that will be running in a separate fiber, independent 17 from the current fiber. 18 19 Params: 20 task = The task to run. 21 */ 22 void async(void delegate() task) 23 { 24 auto fiber = new Fiber(task); 25 fibersQueued.insert(fiber); 26 } 27 28 @("async queues task") 29 unittest 30 { 31 scope(exit) fibersQueued = SList!Fiber(); 32 fibersQueued.empty.should.equal(true).because("there should be no queued tasks at first"); 33 async({}); 34 fibersQueued.empty.should.equal(false).because("there should be a single task"); 35 } 36 37 @("async should not immediately execute its task") 38 unittest 39 { 40 scope(exit) fibersQueued = SList!Fiber(); 41 bool executed = false; 42 auto executeIt = {executed = true;}; 43 async(executeIt); 44 executed.should.equal(false).because("async should not execute its operand"); 45 } 46 47 /** 48 Runs the argument in a separate task, waiting for the result. 49 */ 50 T await(T)(lazy T task) 51 in (Fiber.getThis() !is null && globalSync !is null) 52 { 53 globalWaitingOnThreads++; 54 shared finished = false; 55 56 auto semaphore = globalSync; 57 T result; 58 scopedTask( 59 { 60 scope(exit) finished = true; 61 assert(semaphore !is null); 62 result = task; 63 semaphore.notify(); 64 }).executeInNewThread(); 65 66 while (!finished) 67 { 68 Fiber.yield(); 69 } 70 globalWaitingOnThreads--; 71 72 return result; 73 } 74 75 @("await can run a quick thread") 76 unittest 77 { 78 scope(exit) fibersQueued = SList!Fiber(); 79 bool executed = false; 80 startScheduler( 81 { 82 await(executed = true); 83 }); 84 executed.should.equal(true).because("a quick thread should run"); 85 } 86 87 @("await can run a slow thread") 88 unittest 89 { 90 scope(exit) fibersQueued = SList!Fiber(); 91 bool executed = false; 92 93 bool largeTask() 94 { 95 import core.thread : Thread; 96 Thread.sleep(2.seconds); 97 executed = true; 98 return true; 99 } 100 101 startScheduler( 102 { 103 await(largeTask()); 104 }); 105 executed.should.equal(true).because("a slow thread should run"); 106 } 107 108 @("await should return the value that was calculated") 109 unittest 110 { 111 scope(exit) fibersQueued = SList!Fiber(); 112 bool executed = false; 113 114 bool someTask() 115 { 116 return true; 117 } 118 119 startScheduler( 120 { 121 executed = await(someTask()); 122 }); 123 executed.should.equal(true).because("a slow thread should run"); 124 } 125 126 /** 127 Starts the scheduler. 128 */ 129 void startScheduler(void delegate() firstTask) 130 { 131 globalSync = new Semaphore; 132 async({firstTask();}); 133 134 while (!fibersQueued.empty) 135 { 136 auto fibersRunning = fibersQueued; 137 fibersQueued = SList!Fiber(); 138 foreach (Fiber fiber; fibersRunning) 139 { 140 fiber.call(); 141 if (fiber.state != Fiber.State.TERM) 142 fibersQueued.insert(fiber); 143 } 144 145 if (globalWaitingOnThreads > 0) 146 { 147 globalSync.wait(); 148 } 149 } 150 } 151 152 @("startScheduler should run initial task") 153 unittest 154 { 155 scope(exit) fibersQueued = SList!Fiber(); 156 bool executed = false; 157 startScheduler({executed = true;}); 158 executed.should.equal(true).because("startScheduler should execute the initial task"); 159 } 160 161 @("startScheduler should also run tasks registered before itself") 162 unittest 163 { 164 scope(exit) fibersQueued = SList!Fiber(); 165 bool executed = false; 166 async({executed = true;}); 167 startScheduler({}); 168 executed.should.equal(true).because("startScheduler should execute the task executed before itself"); 169 } 170 171 @("startScheduler should also run tasks registered by the initial task") 172 unittest 173 { 174 scope(exit) fibersQueued = SList!Fiber(); 175 bool executed = false; 176 startScheduler( 177 { 178 async({executed = true;}); 179 }); 180 executed.should.equal(true).because("startScheduler should execute the task created during the initial task"); 181 }