1 module dawait;
2 
3 import std.parallelism;
4 import std.container;
5 import core.thread.fiber;
6 import core.sync.semaphore;
7 
8 version(unittest) import fluent.asserts;
9 
10 private SList!Fiber fibersQueued = SList!Fiber();
11 private size_t globalWaitingOnThreads = 0;
12 private __gshared Semaphore globalSync;
13 
14 /**
15 Creates an async task.
16 An async task is a task that will be running in a separate fiber, independent
17 from the current fiber.
18 
19 Params:
20 	task = The task to run.
21 */
22 void async(void delegate() task)
23 {
24 	auto fiber = new Fiber(task);
25 	fibersQueued.insert(fiber);
26 }
27 
28 @("async queues task")
29 unittest
30 {
31 	scope(exit) fibersQueued = SList!Fiber();
32 	fibersQueued.empty.should.equal(true).because("there should be no queued tasks at first");
33 	async({});
34 	fibersQueued.empty.should.equal(false).because("there should be a single task");
35 }
36 
37 @("async should not immediately execute its task")
38 unittest
39 {
40 	scope(exit) fibersQueued = SList!Fiber();
41 	bool executed = false;
42 	auto executeIt = {executed = true;};
43 	async(executeIt);
44 	executed.should.equal(false).because("async should not execute its operand");
45 }
46 
47 /**
48 Runs the argument in a separate task, waiting for the result.
49 */
50 T await(T)(lazy T task)
51 in (Fiber.getThis() !is null && globalSync !is null)
52 {
53 	globalWaitingOnThreads++;
54 	shared finished = false;
55 
56 	auto semaphore = globalSync;
57 	T result;
58 	scopedTask(
59 	{
60 		scope(exit) finished = true;
61 		assert(semaphore !is null);
62 		result = task;
63 		semaphore.notify();
64 	}).executeInNewThread();
65 
66 	while (!finished)
67 	{
68 		Fiber.yield();
69 	}
70 	globalWaitingOnThreads--;
71 
72 	return result;
73 }
74 
75 @("await can run a quick thread")
76 unittest
77 {
78 	scope(exit) fibersQueued = SList!Fiber();
79 	bool executed = false;
80 	startScheduler(
81 	{
82 		await(executed = true);
83 	});
84 	executed.should.equal(true).because("a quick thread should run");
85 }
86 
87 @("await can run a slow thread")
88 unittest
89 {
90 	scope(exit) fibersQueued = SList!Fiber();
91 	bool executed = false;
92 
93 	bool largeTask()
94 	{
95 		import core.thread : Thread;
96 		Thread.sleep(2.seconds);
97 		executed = true;
98 		return true;
99 	}
100 
101 	startScheduler(
102 	{
103 		await(largeTask());
104 	});
105 	executed.should.equal(true).because("a slow thread should run");
106 }
107 
108 @("await should return the value that was calculated")
109 unittest
110 {
111 	scope(exit) fibersQueued = SList!Fiber();
112 	bool executed = false;
113 
114 	bool someTask()
115 	{
116 		return true;
117 	}
118 
119 	startScheduler(
120 	{
121 		executed = await(someTask());
122 	});
123 	executed.should.equal(true).because("a slow thread should run");
124 }
125 
126 /**
127 Starts the scheduler.
128 */
129 void startScheduler(void delegate() firstTask)
130 {
131 	globalSync = new Semaphore;
132 	async({firstTask();});
133 
134 	while (!fibersQueued.empty)
135 	{
136 		auto fibersRunning = fibersQueued;
137 		fibersQueued = SList!Fiber();
138 		foreach (Fiber fiber; fibersRunning)
139 		{
140 			fiber.call();
141 			if (fiber.state != Fiber.State.TERM)
142 				fibersQueued.insert(fiber);
143 		}
144 
145 		if (globalWaitingOnThreads > 0)
146 		{
147 			globalSync.wait();
148 		}
149 	}
150 }
151 
152 @("startScheduler should run initial task")
153 unittest
154 {
155 	scope(exit) fibersQueued = SList!Fiber();
156 	bool executed = false;
157 	startScheduler({executed = true;});
158 	executed.should.equal(true).because("startScheduler should execute the initial task");
159 }
160 
161 @("startScheduler should also run tasks registered before itself")
162 unittest
163 {
164 	scope(exit) fibersQueued = SList!Fiber();
165 	bool executed = false;
166 	async({executed = true;});
167 	startScheduler({});
168 	executed.should.equal(true).because("startScheduler should execute the task executed before itself");
169 }
170 
171 @("startScheduler should also run tasks registered by the initial task")
172 unittest
173 {
174 	scope(exit) fibersQueued = SList!Fiber();
175 	bool executed = false;
176 	startScheduler(
177 	{
178 		async({executed = true;});
179 	});
180 	executed.should.equal(true).because("startScheduler should execute the task created during the initial task");
181 }