元のコード

元のコード

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
package christie.codegear;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;


// https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172
public class PriorityThreadPoolExecutors {

    public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
        return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
    }
    private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                int keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

        @Override
        public void execute(Runnable command) {
            // If this is ugly then delegator pattern needed
            if (command instanceof ComparableTask) //Already wrapped
                super.execute(command);
            else {
                super.execute(newComparableRunnableFor(command));
            }
        }

        private Runnable newComparableRunnableFor(Runnable runnable) {
            return new ComparableRunnable((CodeGearExecutor) runnable);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new ComparableFutureTask<>((CodeGearExecutor)runnable, value);
        }

        private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
            private Long sequentialOrder = instanceCounter.getAndIncrement();
            private CodeGearExecutor hasPriority;

            public ComparableFutureTask(CodeGearExecutor priorityRunnable, T result) {
                super(priorityRunnable, result);
                this.hasPriority = priorityRunnable;
            }

            @Override
            public long getInstanceCount() {
                return sequentialOrder;
            }

            @Override
            public int getPriority() {
                return hasPriority.getPriority();
            }
        }

        private static class ComparableRunnable implements Runnable, ComparableTask {
            private Long instanceCount = instanceCounter.getAndIncrement();
            private CodeGearExecutor runnable;

            public ComparableRunnable(CodeGearExecutor priorityRunnable) {
                this.runnable = priorityRunnable;
            }

            @Override
            public void run() {
                runnable.run();
            }

            @Override
            public int getPriority() {
                return runnable.getPriority();
            }

            @Override
            public long getInstanceCount() {
                return instanceCount;
            }
        }

        private interface ComparableTask extends Runnable {
            int getPriority();

            long getInstanceCount();

            static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
                return (o1, o2) -> {
                    int priorityResult = o2.getPriority() - o1.getPriority();
                    return priorityResult != 0 ? priorityResult
                            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
                };
            }

        }

    }

}

# 読み解き

参考元? https://stackoverflow.com/questions/807223/how-do-i-implement-task-prioritization-using-an-executorservice-in-java-5/42831172#42831172


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class PriorityThreadPoolExecutors {

    public static ThreadPoolExecutor createThreadPool(int nThreads, int keepAliveTime) {
        return new PriorityThreadPoolExecutor(nThreads, nThreads, keepAliveTime, TimeUnit.MILLISECONDS);
    }
    private static class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
        private static final int DEFAULT_PRIORITY = 0;
        private static AtomicLong instanceCounter = new AtomicLong();

        public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                int keepAliveTime, TimeUnit unit) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));
        }

ThreadPoolExecutorを継承したPriorityThreadPoolExecutorクラス
デフォルトの優先度は0
スレッド数と生存時間を指定 →スレッド数はcpu数によって変わる (確かめたところ16だった) Runtime.getRuntime().availableProcessors()
https://stackoverflow.com/questions/11877947/runtime-getruntime-availableprocessors
c#だとこっち
https://oita.oika.me/2016/02/18/task-and-threadpool/

生存時間はintの最大値が使用されている

1
2
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
                    ComparableTask.comparatorByPriorityAndSequentialOrder()));

この部分のComparableTask.comparatorByPriorityAndSequentialOrder()はこの条件のもとでQueue内の入れ替えを行うという処理
おそらく実行待ちのQueue内で入れ替えを行う


1
2
3
4
5
6
7
8
9
@Override
public void execute(Runnable command) {
    // If this is ugly then delegator pattern needed
    if (command instanceof ComparableTask) //Already wrapped
        super.execute(command);
    else {
        super.execute(newComparableRunnableFor(command));
    }
}

Runnableの実行部分
ComparableTaskでラップすることが前提
→ComparableTaskは優先度比較のクラス??


1
2
3
4
5
6
7
 static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
    return (o1, o2) -> {
        int priorityResult = o2.getPriority() - o1.getPriority();
        return priorityResult != 0 ? priorityResult
            : (int) (o1.getInstanceCount() - o2.getInstanceCount());
    };
}

比較部分
javaのComparatorはintを返すことで順序を比較する
→ retrunが o1とo2を比較して -1なら そのまま
→ 1なら入れ替える

0の場合優先度の差がないので、instance生成順で並び替える


# 書き換えの方針

https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0
C#だとTaskSchedulerを拡張して中のQueueの順番を優先度に従って更新する感じで実装するとできそう


# 参考ページ

http://normalse.hatenablog.jp/entry/2015/04/03/075443
https://qiita.com/sano1202/items/64593e8e981e8d6439d3

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy