1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
package christie.codegear;
import java.util.Comparator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
// https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
public class PriorityThreadPoolExecutors {
public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
}
private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
private static final int DEFAULT_PRIORITY = 0;
private static AtomicLong instanceCounter = new AtomicLong();
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
int keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
ComparableTask.comparatorByPriorityAndSequentialOrder()));
}
@Override
public void execute(Runnable command) {
// If this is ugly then delegator pattern needed
if (command instanceof ComparableTask) //Already wrapped
super.execute(command);
else {
super.execute(newComparableRunnableFor(command));
}
}
private Runnable newComparableRunnableFor(Runnable runnable) {
return new ComparableRunnable((CodeGearExecutor) runnable);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask<>((CodeGearExecutor)runnable, value);
}
private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
private Long sequentialOrder = instanceCounter.getAndIncrement();
private CodeGearExecutor hasPriority;
public ComparableFutureTask(CodeGearExecutor priorityRunnable, T result) {
super(priorityRunnable, result);
this.hasPriority = priorityRunnable;
}
@Override
public long getInstanceCount() {
return sequentialOrder;
}
@Override
public int getPriority() {
return hasPriority.getPriority();
}
}
private static class ComparableRunnable implements Runnable, ComparableTask {
private Long instanceCount = instanceCounter.getAndIncrement();
private CodeGearExecutor runnable;
public ComparableRunnable(CodeGearExecutor priorityRunnable) {
this.runnable = priorityRunnable;
}
@Override
public void run() {
runnable.run();
}
@Override
public int getPriority() {
return runnable.getPriority();
}
@Override
public long getInstanceCount() {
return instanceCount;
}
}
private interface ComparableTask extends Runnable {
int getPriority();
long getInstanceCount();
static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
return (o1, o2) -> {
int priorityResult = o2.getPriority() - o1.getPriority();
return priorityResult != 0 ? priorityResult
: (int) (o1.getInstanceCount() - o2.getInstanceCount());
};
}
}
}
}
|
#
読み解き
参考元?
https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public class PriorityThreadPoolExecutors {
public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
}
private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
private static final int DEFAULT_PRIORITY = 0;
private static AtomicLong instanceCounter = new AtomicLong();
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
int keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
ComparableTask.comparatorByPriorityAndSequentialOrder()));
}
|
ThreadPoolExecutorを継承したPriorityThreadPoolExecutorクラス
デフォルトの優先度は0
スレッド数と生存時間を指定
→スレッド数はcpu数によって変わる (確かめたところ16だった)
Runtime.getRuntime().availableProcessors()
https://stackoverflow.com/questions/11877947/runtime-getruntime-availableprocessors
c#だとこっち
https://oita.oika.me/2016/02/18/task-and-threadpool/
生存時間はintの最大値が使用されている
1
2
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
ComparableTask.comparatorByPriorityAndSequentialOrder()));
|
この部分のComparableTask.comparatorByPriorityAndSequentialOrder()
はこの条件のもとでQueue内の入れ替えを行うという処理
おそらく実行待ちのQueue内で入れ替えを行う
1
2
3
4
5
6
7
8
9
|
@Override
public void execute(Runnable command) {
// If this is ugly then delegator pattern needed
if (command instanceof ComparableTask) //Already wrapped
super.execute(command);
else {
super.execute(newComparableRunnableFor(command));
}
}
|
Runnableの実行部分
ComparableTaskでラップすることが前提
→ComparableTaskは優先度比較のクラス??
1
2
3
4
5
6
7
|
static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
return (o1, o2) -> {
int priorityResult = o2.getPriority() - o1.getPriority();
return priorityResult != 0 ? priorityResult
: (int) (o1.getInstanceCount() - o2.getInstanceCount());
};
}
|
比較部分
javaのComparatorはintを返すことで順序を比較する
→ retrunが o1とo2を比較して -1なら そのまま
→ 1なら入れ替える
0の場合優先度の差がないので、instance生成順で並び替える
#
書き換えの方針
https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0
C#だとTaskSchedulerを拡張して中のQueueの順番を優先度に従って更新する感じで実装するとできそう
#
参考ページ
http://normalse.hatenablog.jp/entry/2015/04/03/075443
https://qiita.com/sano1202/items/64593e8e981e8d6439d3