成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

KeyAffinityExecutor 線程池

開發 前端
針對上面的場景我們可以通過 KeyAffinityExecutor (KeyAffinityExecutor 是一個可以按照指定的Key親和順序消費的執行器) 來解決這個問題,我們下面一起來了解下 KeyAffinityExecutor 。

線上案例

有一批量的數據,可以按照一個固定的 key 分組并發,但是要保證組內并行的處理。 比如:商城中,不同的用戶可以并發下單,但是一個用戶只能進行順序的下單。在全局并發的場景下保證局部有序,保證最小事務單元操作的原子性。

圖片

針對上面的場景我們可以通過 KeyAffinityExecutor (KeyAffinityExecutor 是一個可以按照指定的Key親和順序消費的執行器) 來解決這個問題,我們下面一起來了解下 KeyAffinityExecutor 。

基本使用

導入依賴

<dependency>
  <groupId>com.github.phantomthief</groupId>
  <artifactId>more-lambdas</artifactId>
  <version>0.1.55</version>
</dependency>

創建線程池

public class KeyAffinityExecutorTest {

    @Test
    public void submitTaskKeyAffinityExecutor() {
        //線程池
        KeyAffinityExecutor keyAffinityExecutor = KeyAffinityExecutor
                .newSerializingExecutor(2, 200, "測試-%d");

        //需要下單的信息
        List<Order> orders = new ArrayList<>();
        orders.add(new Order(1, "iPhone 16 Max"));
        orders.add(new Order(1, "Thinking In Java"));
        orders.add(new Order(1, "MengNiu Milk"));
        orders.add(new Order(2, "Thinking In Java"));
        orders.add(new Order(3, "HUAWEI 100P"));
        orders.add(new Order(4, "XIAOMI 20"));
        orders.add(new Order(5, "OPPO 98"));
        orders.add(new Order(6, "HP EC80"));
        orders.add(new Order(7, "BBK 100P"));
        orders.add(new Order(8, "TCL 1380"));
        orders.add(new Order(9, "CHANGHONG 32"));

        orders.forEach(order -> keyAffinityExecutor.submit(order.getAccountId(), () -> {
            System.out.println(Thread.currentThread() + " accountId:" + order.getAccountId() +
                    ", skuNo:" + order.getSkuNo() + " checkout success!");
            return null;
        }));

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        Assert.assertTrue(true);
    }


    @Data
    @AllArgsConstructor
    public static class Order {
        long accountId;

        String skuNo;
    }
}

輸出結果如下:

Thread[測試-0,5,main] accountId:1, skuNo:iPhone 16 Max checkout success!
Thread[測試-1,5,main] accountId:2, skuNo:Thinking In Java checkout success!
Thread[測試-1,5,main] accountId:3, skuNo:HUAWEI 100P checkout success!
Thread[測試-1,5,main] accountId:4, skuNo:XIAOMI 20 checkout success!
Thread[測試-0,5,main] accountId:1, skuNo:Thinking In Java checkout success!
Thread[測試-1,5,main] accountId:6, skuNo:HP EC80 checkout success!
Thread[測試-0,5,main] accountId:1, skuNo:MengNiu Milk checkout success!
Thread[測試-1,5,main] accountId:8, skuNo:TCL 1380 checkout success!
Thread[測試-0,5,main] accountId:5, skuNo:OPPO 98 checkout success!
Thread[測試-0,5,main] accountId:7, skuNo:BBK 100P checkout success!
Thread[測試-0,5,main] accountId:9, skuNo:CHANGHONG 32 checkout success!

結論:對于 acccountId = 1 有三條數據都是在同一個線程下面執行,線程ID:測試-0 所以可以保證局部有序。

實現原理

  1. 選擇執行的線程池, 這里我們可以看到,如果當前 key 存在線程池就直接返回,如果不存在就創建,或者選擇一個任務比較少的線程池,這里可以保證任務分發的均勻性。
//通過 key 選出一個執行線程
@Nonnull
public V select(K key) {
    int thisCount = count.getAsInt();
    tryCheckCount(thisCount);
    KeyRef keyRef = mapping.compute(key, (k, v) -> {
        // 如果不存在就創建一個
        if (v == null) {
            if (usingRandom.test(thisCount)) {
                do {
                    try {
                        v = new KeyRef(all.get(ThreadLocalRandom.current().nextInt(all.size())));
                    } catch (IndexOutOfBoundsException e) {
                        // ignore
                    }
                } while (v == null);
            } else {
                v = all.stream()
                        .min(comparingInt(ValueRef::concurrency))
                        .map(KeyRef::new)
                        .orElseThrow(IllegalStateException::new);
            }
        }
        v.incrConcurrency();
        return v;
    });
    return keyRef.ref();
}
  1. 執行線程池的初始化, 這里的本質是創建只有一個線程的線程池。這樣就可以保證,任務被路由到同一個 key 下面,那么就可以保證順序執行。
static Supplier<ExecutorService> executor(String threadName, int queueBufferSize) {
        return new Supplier<ExecutorService>() {

            // ThreadFactory
            private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
                    .setNameFormat(threadName)
                    .build();

            @Override
            public ExecutorService get() {
                LinkedBlockingQueue<Runnable> queue;
                if (queueBufferSize > 0) {
                    // blockingQueue
                    queue = new LinkedBlockingQueue<Runnable>(queueBufferSize) {

                        @Override
                        public boolean offer(Runnable e) {
                            try {
                                //讓 offer 方法阻塞,
                                //為什么這么做可以看 ThreadPoolExecutor 1347 行
                                put(e);
                                return true;
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt();
                            }
                            return false;
                        }
                    };
                } else {
                    queue = new LinkedBlockingQueue<>();
                }
                //創建一個線程的線程池
                return new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, queue, threadFactory);
            }
        };
    }
  1. 最后任務執行完畢,回收線程。
//當每一個key執行完之后回收處理這個key的線程池.
public void finishCall(K key) {
//如果執行完畢后返回 null
mapping.computeIfPresent(key, (k, v) -> {
    if (v.decrConcurrency()) {
        return null;
    } else {
        return v;
    }
});
}

總結,這里其實我們也可以通過只有一個線程的線程數組實現,來實現按照唯一key,進行 hash 路由。

參考地址

https://github.com/PhantomThief/more-lambdas-java

責任編輯:武曉燕 來源: 運維開發故事
相關推薦

2024-07-15 08:20:24

2023-10-13 08:20:02

Spring線程池id

2012-05-15 02:18:31

Java線程池

2020-12-10 08:24:40

線程池線程方法

2023-06-07 13:49:00

多線程編程C#

2025-01-09 11:24:59

線程池美團動態配置中心

2017-01-10 13:39:57

Python線程池進程池

2019-12-27 09:09:42

Tomcat線程池JDK

2012-02-29 13:26:20

Java

2024-12-13 08:21:04

2024-11-21 07:00:00

線程池Java開發

2020-09-04 10:29:47

Java線程池并發

2013-05-28 13:57:12

MariaDB

2011-06-22 15:50:45

QT 線程

2020-03-05 15:34:16

線程池C語言局域網

2023-11-22 08:37:40

Java線程池

2013-05-23 15:59:00

線程池

2013-06-08 13:07:23

Java線程池調度器

2021-09-11 15:26:23

Java多線程線程池

2015-08-20 09:17:36

Java線程池
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日韩美香港a一级毛片免费 国产综合av | 国产精品精品久久久久久 | 国产成人免费观看 | 久久久视频在线 | 美女黄网站 | 国产成人精品免费视频大全最热 | 一区二区视频在线 | 国内自拍偷拍 | 天天干狠狠干 | 99中文字幕| 亚洲精品久久嫩草网站秘色 | 波多野结衣先锋影音 | 亚洲一区二区三区免费在线观看 | 91视视频在线观看入口直接观看 | 精品国产18久久久久久二百 | 97国产爽爽爽久久久 | 午夜av一区二区 | 久久久一区二区 | 国产1页| 欧美精品一区二区三区在线 | 狠狠草视频 | 久久精品中文字幕 | 91极品视频 | 久久国际精品 | 一级黄色片在线免费观看 | 久久精品免费 | 69av在线视频 | 久草欧美视频 | 黄色大片免费看 | 国产国语精品 | 丁香五月网久久综合 | 国产精品久久久久久久久久久久冷 | 久色视频在线 | 日韩在线大片 | 欧美成人精品 | 日韩一区二区三区视频在线播放 | 中文字幕在线网 | jizz亚洲人 | 一级黄在线观看 | 欧美精品在线播放 | 日韩欧美在线播放 |