JUC部分并发类使用方式

下面介绍的是JUC包下一些线程安全类的一些简单使用和一些小demo。

Semaphore

信号量,即可以同时使用的线程数,tryrequire就是将信号量减一,release就是信号量+1,当等于0就会阻塞,大于零才会唤醒。

当需要控制线程访问数量,可以使用信号量来做控制,比较简单。

下面是使用信号量改进的数据库连接池

@Slf4j
public class SemaphoreConnection {

    // 连接池对象数组
    private Connection[] connections;
    // 使用标记
    private AtomicIntegerArray flagArrays;
    // 线程池大小
    private Integer poolSize;
    /**
     * 信号量
     */
    Semaphore semaphore;

    public SemaphoreConnection(){
        this.poolSize = 5;
        connections = new MarkConnection[5];
        flagArrays = new AtomicIntegerArray(5);
        for (int i = 0; i < connections.length; i++) {
            connections[i] = new MarkConnection("连接" + i+1);
        }
        semaphore = new Semaphore(5);
    }

    // 连接池的初始化
    public SemaphoreConnection(int poolSize) {
        this.poolSize = poolSize;
        connections = new MarkConnection[poolSize];
        flagArrays = new AtomicIntegerArray(poolSize);
        for (int i = 0; i < connections.length; i++) {
            connections[i] = new MarkConnection("连接" + i);
        }
        semaphore = new Semaphore(poolSize);
    }

    // 向连接池中请求连接
    public Connection getConnection(){
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true){
            for (int i = 0; i < poolSize; i++) {
                // 进行cas请求,如果请求失败就失败
                if (flagArrays.compareAndSet(i,0,1)){
                    return connections[i];
                }
            }
        }
    }

    /**
     * 释放连接
     * @param con
     */
    public void releaseConnection(Connection con){
        for (int i = 0; i < poolSize; i++) {
            if (con == connections[i]){
                // 将连接标识置为0即空间连接
                flagArrays.set(i,0);
                semaphore.release();
            }
        }
    }

    public static void main(String[] args) {
        SemaphoreConnection myConnectionPoll = new SemaphoreConnection(3);
        for (int i = 0; i < 10; i++) {
            new Thread(() ->{
                MarkConnection connection =(MarkConnection) myConnectionPoll.getConnection();
                log.debug("获得锁{}",connection.getConName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                myConnectionPoll.releaseConnection(connection);
                log.debug("释放锁{}",connection.getConName());
            }).start();
        }
    }
}

CountDownLatch

就是一个用来计数的,来解决线程之间通信的问题。

比如我需要两个线程执行完毕后再去执行主线程,我们可以用join方法,但是我们同样可以使用这个类来控制。

通过一个count,当其被减为0时,调用await阻塞的线程就会唤醒。

// 一个简单的demo
@Slf4j
public class Test2 {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
//        for(int i = 0;i < 3;i++){
            new Thread(()->{
                log.debug("来啦");
                try {
                    Thread.sleep(1000);
                    countDownLatch.countDown();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("干啦兄弟们");
            }).start();
            new Thread(()->{
                log.debug("来啦");
                try {
                    Thread.sleep(2000);
                    countDownLatch.countDown();

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("干啦兄弟们");
            }).start();
//        }
        countDownLatch.await();
        log.debug("开吃了兄弟们");
    }

}

同时可以作为微服务下的多线程调用,当我们需要多个微服务的结果时,我们可以使用多线程,调用多线程池使用多线程的方法远程调用,可以使用该类控制主线程等待线程池中的线程执行完后执行主线程。但是使用Future可以解决问题,所以还是建议使用future,使用callable方法。

CyclicBarrier

作为一个和CountDownLatch功能相似,但是它却是可以循环使用的,当其置为0时,唤醒线程后,就会恢复到初始值。可以继续使用,而CountDownLatch却是一次性的,需要重复创建线程。

CyclicBarrier主要是调用一个await方法,等待其他线程,当到达数量的线程完成后,就会唤醒被await方法阻塞的线程。

// demo
@Slf4j
public class Test3 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        new Thread(()->{
            log.debug("来啦");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("干啦兄弟们");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(()->{
            log.debug("来啦");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("干啦兄弟们");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }).start();
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        log.debug("来了啊");
    }
}

ConcurrentHashMap

保证了多线程下hashmap的安全。

jdk1.7时会产生死链,因为jdk1.7时多线程下是在链表头往里面加,而 不是1.8在链表尾加,所以会出现多线程下链表出现环,无法结束程序。

Concurrent类在多线程下的表现为

  • 内部使用cas进行优化,可以支持较大的吞吐量
  • 弱一致性
    • 迭代时的弱一致性,当在迭代是被更新,迭代不会报错但是会表现出旧值
    • size的弱一致性,会读到旧值
    • 读取一致性

fast-fail快速失败就是当其他线程修改了,非安全容器就会让正在遍历的直接抛出异常。

正文到此结束
评论插件初始化中...
Loading...