Use (Examples)
This entry gives a very simple example, in various languages.
The key methods are TPE.submit()
and FutureObject.result()
[.get()
in some languages]. Also
FutureObject.done()
[or .isDone
] is a important method.
There are many more methods, depending on the implementation. Bit without using the above ones, you not using it properly.
There are basically three steps:
Create the TPE object. This will start the workers (threads) and do the plumbing
Submit callables (functions) to the TPE. Which will distribute it to an available worker, or queue it temporally.
Use the result later by reading the returned
FutureObject
.
When calling TPE.submit()
, a FutureObject
is returned directly. This is kind of a placeholder; it
doesn’t contain the result yet! When the submitted function (eventually) returns, the return-value is stored in that
placeholder.
One call monitor the availability of the result by FutureObject.done()
; only when it is True
the result is
available. Or, one just wait for it by calling FutureObject.result()
. Surely, then it is done.
Note: one can also give a wait-limit to result()
. See you language and/or implementation
documentation for details
Python
#! python
import time, random
from concurrent.futures import ThreadPoolExecutor as TCE
TRIES=10
MAX=6
def taak(getal):
time.sleep(getal/MAX) # Simulate a complicated calculations ...
return getal, getal * getal # ... returning the number and it square
def demo():
workers = TCE(max_workers=4) # (1)
results = {}
for n in range(TRIES): # (2) Submit all tasks; (with random number)
results[n] = workers.submit(taak, random.randint(1, MAX)) # Store the Futures
for n, f in results.items(): # (3) Print the results (in order)
done = f.done()
print("{n}: {g} ==> {r} ({done})".format(n=n, g=f.result()[0], r=f.result()[1],
done="direct" if done else "waited"))
if __name__ == '__main__':
demo()
You can run this examples by (using python-3)
[albert@MESS:1]% python examples/TPE_demo_1.py
0: 5 ==> 25 (waited)
1: 3 ==> 9 (direct)
2: 6 ==> 36 (waited)
#...
Java
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
public class TPE_demo_2 {
private static final int TRIES = 10;
private static final int WORKERS = 4;
public static void main(String[] args) {
demo();
}
private static void demo() {
ThreadPoolExecutor workers = (ThreadPoolExecutor) Executors.newFixedThreadPool(WORKERS); //* (1)
List<Future<Long>> resultList = new ArrayList<>();
//* (2) Sumbit all task, with a random input
for (int i = 1; i <= TRIES; i++) {
Future<Long> future = workers.submit(new DemoTask((long) (Math.random() * 10)));
resultList.add(future); // And save future result
}
//* (3) Now print all results; wait on the when needed [.get() does]
for(Future<Long> f : resultList) {
try {
System.out.println("Future done (Y/N)? :" + f.isDone() + ".\tResult is: " + f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//* Stop the workers
workers.shutdown();
}
}
class DemoTask implements Callable {
Long number;
public DemoTask(Long number) {
this.number = number; // JAVA: save the input of this "function
}
public Long call() {
try {
TimeUnit.SECONDS.sleep(number/4); // Simulate a complicated calculations ...
} catch (InterruptedException e) {
e.printStackTrace();
}
return number * number; // ... returning the square
}
}
To compile this example, compile it first (int the examples dir)
[albert@MESS:2]% javac TPE_demo.java
[albert@MESS:3]% java TPE_demo_2 DemoTask
Future done (Y/N)? :true. Result is: 1
Future done (Y/N)? :false. Result is: 49
Future done (Y/N)? :false. Result is: 64
Future done (Y/N)? :true. Result is: 9
#...
Comments
comments powered by Disqus