Java高并发编程 (马士兵老师视频)笔记(二)并发容器

   本篇主要总结了:线程安全的单例模式和并发容器。其中并发容器包含:ConcurrentHashMap、ConcurrentSkipListMap、CopyOnWriteArrayList和队列相关的内部加锁的并发队列ConcurrentLinkedQueue 以及阻塞队列BlockingQueue (LinkedBlockingQueue、ArrayBlockingQueue 、DelayQueue 、TransferQueue、SynchronizedQueue )

参看java-面试-Java并发容器大合集


目录

1.线程安全的单例模式。

♣ 1.1  多线程安全单例模式(不使用同步锁)

♣ 1.2  多线程安全单例模式( 延迟/懒加载  使用同步方法)

♣ 1.3  多线程安全单例模式(延迟/懒加载  使用双重同步锁)

♣ 1.4  多线程安全单例模式( 延迟/懒加载  使用静态内部类)

2.并发容器

♣ 例2.1  多编程卖票

  ♣♣ 2.1.1 普通的思路来写,分析问题所在

  ♣♣ 2.1.2 使用线程安全的容器Vector

  ♣♣ 2.1.3  在判断和操作放在同步代码块中

  ♣♣ 2.1.4 使用队列(Queue)来实现

♣ 2.2 List、Map相关的

  ♣♣ 2.2.1 ConcurrentHashMap和ConcurrentSkipListMap

♣♣ 2.2.1 CopyOnWriteArrayList 写时复制容器

♣ 2.3 Queue(队列)相关

♣♣ 2.3.1 ConcurrentLinkedQueue 并发队列(内部加锁的)

♣♣ 2.3.2 BlockingQueue 阻塞式队列


 

1.线程安全的单例模式。

    单例模式就是说系统中对于某类只能有一个实例对象,不能出现第二个!面试中常常会被问到或者手写一个线程安全的单例模式,主要考察多线程情况下的线程安全问题。

♣ 1.1  多线程安全单例模式(不使用同步锁)

/*直接加载。缺点:在该类加载的时候就会直接new一个静态对象出来,当系统中这样的类较多时,就使得启动速度变慢。*/
public class SingletonDirectlyNew {
    private static SingletonDirectlyNew single = new SingletonDirectlyNew();  //直接初始化一个对象
    private SingletonDirectlyNew() { //构造方法私有,保证其他类对象不能直接new一个该对象的实例
        
    }
    public static SingletonDirectlyNew getSingle(){ //该类唯一的一个public方法
        return single;
    }
}

 

现在流行的设计都是讲“延迟加载”,可以在第一次使用的时候才初始化第一个该类的对象。

 

♣ 1.2  多线程安全单例模式( 延迟/懒加载  使用同步方法)

/*锁住了一个方法,锁的力度有点大*/
public class SingletonSynMethod {
    private static SingletonSynMethod instance;
    private SingletonSynMethod() {  //构造方法私有
        
    }
    public static synchronized SingletonSynMethod getInstance() {  //对获取实例的方法进行同步
        if (instance == null) {
            instance = new SingletonSynMethod();
        }
        return instance;
    }
}

 

♣ 1.3  多线程安全单例模式(延迟/懒加载  使用双重同步锁)

public class SingletonDoubleSyn {
    private static SingletonDoubleSyn instance;
    private SingletonDoubleSyn () {  //构造方法私有
        
    }
    public static SingletonDoubleSyn getInstance() {
        if (instance == null) {
            synchronized (SingletonDoubleSyn.class) {  //锁定new语句
                if (instance == null) {
                    instance = new SingletonDoubleSyn();
                }
            }
        }
        return instance;
    }
}

 

♣ 1.4  多线程安全单例模式( 延迟/懒加载  使用静态内部类)

/*不用加锁 也能实现懒加载*/
public class SingletonInner {
    private SingletonInner() {

    }
    private static class Inner { //静态内部类
        private static SingletonInner s = new SingletonInner();
    }
    private static  SingletonInner getSingle() {
        return Inner.s;
    }
}

 

 

2.并发容器

♣ 例2.1  多编程卖票

  ♣♣ 2.1.1 普通的思路来写,分析问题所在

/*下面程序模拟卖票可能会出现两个问题:①票卖重了 ②还剩最后一张票时,好几个线程同时抢,出现-1张票
* 出现上面两个问题主要是因为:①remove()方法不是原子性的 ②判断+操作不是原子性的*/
public class TicketSeller1 {
    static List<String> tickets = new ArrayList<>();
    static {
        for (int i=0; i<10000; i++) {  //共一万张票
            tickets.add("票编号--" + i);
        }
    }
    public static void main(String[] args) {
        for (int i=0; i<10; i++) {   //共10个线程卖票
            new Thread(()->{
                while(tickets.size() > 0) {  //判断余票
                    System.out.println("销售了..." + tickets.remove(0)); //操作减票
                }
            }).start();
        }
    }
}

 

  ♣♣ 2.1.2 使用线程安全的容器Vector

/*本程序虽然用了Vector作为容器,Vector中的方法都是原子性的,但是在判断size和减票的中间还是可能被打断的,即被减到-1张*/
public class TicketSeller2 {
    static Vector<String> tickets = new Vector<>();  //Vector是一个同步容器
    static {
        for (int i=0; i<100; i++) tickets.add("票编号-" + i);
    }

    public static void main(String[] args) {
        for (int i=0; i<10; i++) {
            new Thread(()->{
                while(tickets.size() > 0) {  //判断余票
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("销售了--"+tickets.remove(0));  //操作减票
                }
            }).start();
        }
    }
}

数组越界:

 

  ♣♣ 2.1.3  在判断和操作放在同步代码块中

/*将判断和操作外面加锁,程序完全没有功能上的问题,但是效率很低*/
public class TicketSeller3 {
    static List<String> tickets = new LinkedList<>();
    static {
        for (int i=0; i<100; i++) {  //共100张票
            tickets.add("票编号:" + i);
        }
    }
    public static void main(String[] args) {
        for (int i=0; i<10; i++) {   //共10个线程卖票
            new Thread(()->{
                while(true) {
                    synchronized (tickets) {
                        if (tickets.size() <= 0) break;  //判断 余票
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("销售了--" + tickets.remove(0)); //操作减票
                    }
                }

            }).start();
        }
    }
}

  ♣♣ 2.1.4 使用队列(Queue)来实现

/*ConcurrentLinkedQueue底层不是加锁的实现,而是ConcurrentSet,效率会高很多。
* Queue一开始不是空的。先poll,再判断tickets是不是空的,最后没有任何操作,所以不用加锁也不会出现任何问题*/
public class TicketSeller4 {
    static Queue<String> tickets = new ConcurrentLinkedQueue<>();
    static {
        for (int i=0; i<1000; i++) {
            System.out.println("票编号:" + i );
        }
    }

    public static void main(String[] args) {
        for (int i=0; i<10; i++) {
            new Thread( ()-> {
                while(true) {
                    String str = tickets.poll(); //poll方法是原子性的,拿出一张票
                    if(str == null) break;
                    else System.out.println("销售了.." + str);
                }
            }).start();
        }
    }
}

 

************************************************开始并发容器的部分了****************************************************

♣ 2.2 List、Map相关的

  ♣♣ 2.2.1 ConcurrentHashMap和ConcurrentSkipListMap

   马老师的视频是2017年的,那时候他应该是按照jdk7中HashMap的底层结构来讲的。jdk8中将之前数组+链表的底层结构改成了数组+链表+红黑树,我对于红黑树那一块还不是很明白,先按马老师讲的记下笔记吧!

    HashMap和HashTable的区别就是HashTable是线程安全的,支持并发操作,效率不够高,所有方法都加了锁,而HashMap线程不安全,实现相对简单,不支持并发操作。

    ConcurrentHashMap支持并发操作,整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表”部分“或”一段“的意思,所以很多地方都会将其描述为分段锁。简言之,ConcurrentHashMap 是一个个Segment 数组,Segment 通过继承 ReentrantLock 来加锁,所以每次需要加锁的操作锁住的是一个 segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。concurrencyLevel(并行级别/并发数/Segment 数)默认是 16,即 ConcurrentHashMap 有 16 个 Segments,所以理论上,最多可以同时支持 16 个线程并发,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。此段是Java7(HashMap结构是数组+链表)的,java8的HashMap结构是数组+链表+红黑树,暂时还没弄太明白。

    ConcurrentSkipListMap和Treemap插入时效率比较低,需要排好顺序。但是查的时候效率很高。

java8 ConcurrentHashMap

public class ConcurrentHashMapTest {
    public static void main(String[] args) {
        Map<String,String> map = new ConcurrentHashMap<>();
//        Map<String,String> map = new ConcurrentSkipListMap<>(); //同TreeMap,插入时效率比较低。查快


//        Map<String,String> map = new Hashtable<>();
//        Map<String,String> map = new HashMap<>();
//        Map<String,String> map = new TreeMap<>();  //插入时要排序,所以插入可能会比较慢

        Random r = new Random();
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);  //门闩计数器  100
        long start = System.currentTimeMillis(); //开始时间
        for (int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                for (int j=0; j<10000; j++) {  //向map中加入1万个随机字符串
                    map.put("a" + r.nextInt(100000),"a"+r.nextInt(100000));
                }
                latch.countDown();  //每执行一个线程,就countdown一次
            });
        }
        Arrays.asList(threads).forEach(t->t.start());  //所有线程启动
        try {
            latch.await(); //主线程在这等着,直到countdown到0
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long end = System.currentTimeMillis();  //结束时间
        System.out.println(end - start);  //程序执行时间
    }
}

tips:Map和Set本质上是一样的,只是Set只有key,没有value,所以下面谈到的Map可以替换成Set。

  在不加锁的情况下,可以用:HashMap、TreeMap、LinkedHashMap。想加锁可以用Hashtable(用的非常少)。

  在并发量不是很高的情况下,可以用Collections.synchronizedXxx()方法,在该方法中传一个不加锁的容器(如Map),它返回一个加了锁的容器(容器中的所有方法加锁)!

   在并发性比较高的情况下,用ConcurrentHashMap ,如果并发性高且要排序的情况下,用ConcurrentSkipListMap。

 

♣♣ 2.2.1 CopyOnWriteArrayList 写时复制容器

CopyOnWriteArrayList在多线程环境下,写时效率低,读时效率高,适合写少读多的环境,比如事件监听器。

/*写时复制:添加元素的时候,会把这个容器复制一份,在复制的那份后面加一个新的,将引用指向复制的那份。
 *读的时候不用加锁,适合写的很少,读的特别多的时候。 */
public class CopyOnWriteListTest {
    public static void main(String[] args) {
        List<String> list =
//                new ArrayList<>();  //这个会出并发问题,最后size<100000,,运行时间:0.1秒多
//                new Vector<>(); //size=100000,,运行时间:0.1秒多
                new CopyOnWriteArrayList<>(); //size=100000,效率很低,因为一直在“复制、写”,运行时间:5秒多
        Random r = new Random();
        Thread[] threads = new Thread[100];
        for (int i=0; i<threads.length; i++) {  //起100个线程,每个线程向容器中加1000个数(最终应该是10万个数)
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    for (int j=0; j<1000; j++) list.add("a" + r.nextInt());
                }
            };
            threads[i] = new Thread(task);
        }
        runAndComputeTime(threads);
        System.out.println(list.size());
    }

    static void runAndComputeTime(Thread[] threads) {
        long start = System.currentTimeMillis();
        Arrays.asList(threads).forEach(t->t.start());
        Arrays.asList(threads).forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long end = System.currentTimeMillis();
        System.out.println(end-start);
    }
}

 

 

♣ 2.3 Queue(队列)相关

  ♣♣ 2.3.1 ConcurrentLinkedQueue 并发队列(内部加锁的)

public class ConcurrentQueue {
    public static void main(String[] args) {
        Queue<String> strs = new ConcurrentLinkedQueue<>();   //还有双端队列...Deque
        for (int i=0; i<10; i++) {
            //类似于add方法,如果是ArrayQueue,add方法可能会抛异常,但是offer方法不会抛异常,返回boolean类型即是否添加成功
            strs.offer("a"+i);
        }
        System.out.println(strs);  //[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
        System.out.println("队列原始大小:" + strs.size());   //队列原始大小:10
        //poll方法表示从头上拿出一个删掉;peek方法表示从头上拿出一个用一下不删。
        System.out.println("poll " + strs.poll() + "后的大小为:" + strs.size()); //poll a0后的大小为:9
        System.out.println("peek " + strs.peek() + "后的大小为:" + strs.size()); //peek a1后的大小为:9
    }
}

 

 

♣♣ 2.3.2 BlockingQueue 阻塞式队列

    ♣♣♣ LinkedBlockingQueue 无界阻塞式队列

/*无界阻塞式队列*/
public class LinkedBlockingQueueTest {
    static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
    static Random r = new Random();
    public static void main(String[] args) {
        new Thread(()->{  //1个生产者线程
            for (int i=0; i<100; i++) {
                try {
                    strs.put("a" + i);  //如果满了,就会等待
                    TimeUnit.MILLISECONDS.sleep(r.nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"producer").start();

        for (int i=0; i<5; i++) {  //5个消费者进程
            new Thread(()-> {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName()
                                + " take-" + strs.take()); //如果空了,就等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"customer"+i).start();
        }
    }
}

 

    ♣♣♣ ArrayBlockingQueue 有界阻塞式队列

/*有界阻塞式队列*/
public class ArrayBlockingQueueTest {
   static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //最多装10个
   static Random r = new Random();

    public static void main(String[] args) {
        for (int i=0; i<10; i++) {
            try {
                strs.put("a" + i);  //向容器中添加10个,就满了
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try { //strs已经满了,以下方法都加不进去,但是处理方式不同
            strs.put("aaa");//发现满了,就会等待,程序阻塞
            strs.add("aaa");  //已经满了,再往里面装就会报异常
            strs.offer("aaa");//不会报异常,但是加不进去,返回是否添加成功
            strs.offer("aaa",1,TimeUnit.SECONDS); //1秒钟后加不进去,就不往里面加了
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(strs);
    }
}

 

    ♣♣♣ DelayQueue 执行定时任务

   往DelayQueue里加的元素是按时间排好序的,该队列是无界的。另外元素要实现Delayed接口,而Delayed接口又继承了Comparable接口,所以该类元素需要实现compareTo()方法;并且每个元素记载着自己还有多长时间才能被拿走,还要实现getDelay()方法。

public class DelayQueueTest {
    static DelayQueue<MyTask> tasks = new DelayQueue<>();

    static class MyTask implements Delayed {  //实现Delayed接口
        long runningTime;
        String name;
        MyTask(long rt,String name) {
            this.runningTime = rt;
            this.name = name;
        }

        @Override
        public long getDelay(TimeUnit unit) { 
            return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
                return 1;
            else  // ==
            return 0;
        }

        @Override
        public String toString() {
            return name + "--" + runningTime;
        }
    }

    public static void main(String[] args) {
        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask(now + 1000, "task1"); //1 s 后执行 //②
        MyTask t2 = new MyTask(now + 2000, "task2"); //2 s后执行  //④
        MyTask t3 = new MyTask(now + 1500, "task3"); //1.5s后执行 //③
        MyTask t4 = new MyTask(now + 500, "task4");  //0.5s后执行 //①
        MyTask t5 = new MyTask(now + 2500, "task5"); //2.5s后执行 //⑤

        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);

        System.out.println(tasks);
        for (int i=0; i<5; i++) {
            try {
                System.out.println(tasks.take()); //按放进去的顺序拿出
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

    ♣♣♣ TransferQueue

    适用场景:消费者先启动,生产者生产一个东西的时候,不扔在队列里,而是直接去找有没有消费者,有的话直接扔给消费者,若没有消费者线程,调用transfer()方法就会阻塞,调用add()、offer()、put()方法不会阻塞。TransferQueue适用于更高的并发情况

/*消费者先启动,生产者生产一个东西的时候,不扔在队列里,而是直接去找有没有消费者,有的话直接扔给消费者,
 若没有消费者线程,调用transfer()方法就会阻塞,调用add()、offer()、put()方法不会阻塞。*/
public class TransferQueueTest {
    public static void main(String[] args) throws InterruptedException {
        LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
        new Thread(()->{ //消费者先启动,可以拿走aaa
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        strs.transfer("aaa");
//        strs.put("aaaa");  //add、offer
//        new Thread(()->{ //消费者在生产者后启动,拿不到aaa,程序阻塞
//            try {
//                System.out.println(strs.take());
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//        }).start();
    }
}

 

    ♣♣♣ SynchronizedQueue (特殊的TransferQueue,容量为0)

扔在队列的东西必须被消费者马上消费掉,否则就会出问题。

/*一种特殊的TransferQueue,生产的任何一个东西必须直接交给消费者消费,不能搁在容器里,容器的容量为0*/
public class SynchronizeQueueTest {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strs = new SynchronousQueue<>();

        new Thread(()->{  //消费者线程
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        strs.put("aaaa"); //不能调用add(报错),add不进去,put阻塞,等待消费者消费,内部调用的transfer.
        System.out.println(strs.size());  //0
    }
}

 


线程池部分以后再总结。。。

Executor类图

 

  • 1
    点赞
  • 12
    收藏
    觉得还不错? 一键收藏
  • 7
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 7
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值