Skip to content

设计模式

设计模式-保护性暂停

即Guarded Suspension,用在一个线程等待另一 个线程的执行结果

  • 有一个结果需要从一个线程传递到另一 个线程,让他们关联同一一个GuardedObject
  • 如果有结果不断从一个线程到另-一个线程那么可以使用消息队列(见生产者消费者)
  • JDK中,join的实现、Future的实现,采用的就是此模式
  • 因为要等待另一一方的结果,因此归类到同步模式

image.png

java
class GuardedObject {
    private Object response;

    public Object get() {
        return get(0);
    }


    public Object get(long timeout) {
        long begin = System.currentTimeMillis();
        long passed = 0;
        synchronized (this) {
            while (response == null) {
                // 防止虚假唤醒
                long waitTime = timeout - passed;
                if (waitTime <= 0) {
                    break;
                }

                try {
                    this.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                passed = System.currentTimeMillis() - begin;

            }
            return response;
        }
    }

    public void complete(Object obj) {
        synchronized (this) {
            this.response = obj;
            this.notifyAll();
        }
    }
}
java
public final synchronized void join(long millis)
throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;
    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }
    if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}

异步模式-生产者/消费者

  • 与前面的保护性暂停中的GuardObject不同,不需要产生结果和消费结果的线程一-对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK中各种阻塞队列,采用的就是这种模式

image.png

java
@ToString
final class Message {
    private final int id;
    private final String message;

    public Message(int id, String message) {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public String getMessage() {
        return message;
    }
}
java
@Slf4j(topic = "c.MessageQueue")
class MessageQueue extends LinkedList<Message> {
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    public Message take() {
        Message message = null;
        synchronized (this) {
            while (this.isEmpty()) {
                log.debug("queue is empty");
                try {

                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            message = this.removeFirst();
            this.notifyAll();
        }
        return message;
    }

    public void put(Message message) {
        synchronized (this) {
            while (this.size() >= this.capacity) {
                log.debug("queue is full");
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("qut ");
            this.addLast(message);
            this.notifyAll();
        }
    }
}
java
public static void main(String[] args) throws InterruptedException {
    MessageQueue queue = new MessageQueue(2);
    for (int i = 0; i < 3; i++) {
        int finalI = i;
        new Thread(() -> {
            queue.put(new Message(finalI, "value" + finalI));
        }, "producer" + i).start();
    }
    new Thread(() -> {
        while (true){
            log.info("" + queue.take());
        }
    }, "consumer").start();
}

同步模式-固定运行顺序

wait notify

pack & unpack

同步模式-交替输出

wait notify

题目:三个线程 分别依次打印 abc 因为线程的调度是随机的,所以通过等待标记,标记下一轮的输出结果

java
public class SyncModel {
    public static void main(String[] args) {
        WaitNotify waitNotify = new WaitNotify(1, 5);
        new Thread(() -> {
            try {
                waitNotify.print("a", 1, 2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "a").start();
        new Thread(() -> {
            try {
                waitNotify.print("b", 2, 3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "b").start();
        new Thread(() -> {
            try {
                waitNotify.print("c", 3, 1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "c").start();
    }
}

@Slf4j
@Data
@AllArgsConstructor
class WaitNotify {
    public void print(String str, int waitFlag, int nextFlag) throws InterruptedException {
        for (int i = 0; i < this.loopNumber; i++) {
            synchronized (this) {
                while (flag != waitFlag) {
                    this.wait();
                }
                log.info(str);
                this.setFlag(nextFlag);
                this.notifyAll();
            }
        }
    }
    
    private int flag;
    private int loopNumber;
}

await & signal

java
public class SyncModel2 {
    public static void main(String[] args) {
        AwaitSignal awaitSignal = new AwaitSignal(5);
        Condition a = awaitSignal.newCondition();
        Condition b = awaitSignal.newCondition();
        Condition c = awaitSignal.newCondition();

        new Thread(() -> awaitSignal.print("a", a, b), "a").start();
        new Thread(() -> awaitSignal.print("b", b, c), "b").start();
        new Thread(() -> awaitSignal.print("c", c, a), "c").start();

        awaitSignal.lock();
        try {
            a.signal();

        } finally {
            awaitSignal.unlock();
        }

    }
}

@Data
@AllArgsConstructor
class AwaitSignal extends ReentrantLock {
    private int loopNumber;

    public void print(String str, Condition condition, Condition next) {
        for (int i = 0; i < loopNumber; i++) {
            lock();
            try {
                condition.await();
                System.out.print(str);
                next.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                unlock();
            }
        }
    }
}

park & unpack