`
zhangzhenjj
  • 浏览: 27225 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

求优化-俩线程,一个读,一个写

阅读更多

 

 有锁实现方案:

package com.boco.sfmhandler.bolts.sender.sort;

import java.io.Serializable;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import com.boco.sfmhandler.model.message.SMessage;

/**
 * 只能单线程调用next
 * 
 * @author boy
 */
public class WaitSortBlockingQueue implements SortQueue<SMessage> {
	private static final long serialVersionUID = -4793564853425866287L;

	private WaitSortBlockingQueue() {
		super();
	}

	private final ReentrantLock lock = new ReentrantLock(false);
	private final Condition empty = lock.newCondition();
	private final Condition full = lock.newCondition();

	private final AtomicLong atomicLong = new AtomicLong(0); // 初始时拿不到最小的sm的id,所以让其超时出去,并获取该id,以后进行累加
	private final TreeMap<Long, SMessage> map = new TreeMap<Long, SMessage>();
	private final int maxBlockSize = 10000;
	private final TimeUnit delayNanoUnit = TimeUnit.NANOSECONDS;
	private final long delayNanoTime = delayNanoUnit.convert(1500L,
			TimeUnit.MILLISECONDS); // 1.5s
	/**
	 * 线程私有
	 */
	private ThreadLocal<Long> preOutputNanoTime = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return System.nanoTime();
		}
	};

	public SMessage next() {
		long curNanoTime = System.nanoTime();
		final ReentrantLock lock = this.lock;
		SMessage sm = null;
		try {
			lock.lockInterruptibly();
			emptyAwait();
			/**
			 * map 中保存的最小的sm的key
			 */
			long firstKey;
			while ((firstKey = map.firstKey()) != atomicLong.get()
					&& ((curNanoTime = System.nanoTime()) - preOutputNanoTime
							.get()) < delayNanoTime) {
				/**
				 * 如果firstkey < 当前累计位置,进行错误备份 这里对等待的时间没做任何修改
				 * 只当该sm是个过客,不影响其它sm的等待时间
				 */
				if (firstKey < atomicLong.get()) {
					sm = map.remove(firstKey);
					full.signal();
					failBackUp(sm);
					emptyAwait();
					continue;
				}
				long awitTime = delayNanoTime
						- (curNanoTime - preOutputNanoTime.get());
				empty.await(awitTime, delayNanoUnit);
			}
			sm = map.remove(firstKey);
			full.signal();
			atomicLong.set(firstKey + 1);
			setPreOutputNanoTime(curNanoTime);
		} catch (InterruptedException e) {
			empty.signal();
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
		return sm;
	}

	/**
	 * 如果是空,直接等待,put时会signal
	 * 
	 * @throws InterruptedException
	 */
	private void emptyAwait() throws InterruptedException {
		while (map.isEmpty()) {
			empty.await();
		}
	}

	/**
	 * 错误sm备份
	 * 
	 * @param sm
	 */
	private void failBackUp(SMessage sm) {

	}

	private void setPreOutputNanoTime(long nanoTime) {
		preOutputNanoTime.set(nanoTime);
	}

	@Override
	public SMessage next(long timeout, TimeUnit unit) {
		return null;
	}

	@Override
	public void put(long key, SMessage sm) {
		final ReentrantLock lock = this.lock;
		try {
			lock.lockInterruptibly();
			while (map.size() >= maxBlockSize)
				full.await();
			map.put(key, sm);
			empty.signal(); // 唤醒在next上等待的线程
		} catch (InterruptedException e) {
			full.signal();
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public void put(SMessage sm) {
		put(sm.getSortId(), sm);
	}

	private static class proxy implements Serializable {
		private static final long serialVersionUID = -3999942172367616131L;
		private final static WaitSortBlockingQueue waitSortBlockingQueue = new WaitSortBlockingQueue();
	}

	public static WaitSortBlockingQueue newInstance() {
		return proxy.waitSortBlockingQueue;
	}
}

 无所实现方案:(这个跳跃表貌似频繁的插入、移除有点慢,有木有更好的非阻塞、排序链表实现?)

package com.boco.sfmhandler.bolts.sender.sort;

import java.util.Comparator;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import com.boco.sfmhandler.model.message.SMessage;

public class NoAwaitSortQueue implements SortQueue<SMessage> {
	private static final long serialVersionUID = -6596506016522782898L;
	private static final int maxSize = 10000;
	private static Logger logger = Logger.getLogger(NoAwaitSortQueue.class);

	/**
	 * 线程私有
	 */
	private ThreadLocal<Long> preOutputNanoTime = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return System.nanoTime();
		}
	};
	private ThreadLocal<Long> index = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return 0L;
		}
	};
	private ThreadLocal<Long> delayNanoTime = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return TimeUnit.NANOSECONDS.convert(1500L, TimeUnit.MILLISECONDS); // 1.5s
		}
	};

	private final ConcurrentSkipListSet<SMessage> concurrentSkipListSet = new ConcurrentSkipListSet<SMessage>(
			new Comparator<SMessage>() {
				@Override
				public int compare(SMessage o1, SMessage o2) {
					return Long.signum(o1.getSortId() - o2.getSortId());
				}
			});

	@Override
	public void put(SMessage value) {
		while (concurrentSkipListSet.size() >= maxSize) {
			//nothing
		}
		concurrentSkipListSet.add(value);
	}

	@Override
	public void put(long key, SMessage value) {
		put(value);
	}

	@Override
	public SMessage next() {
		do {
			if (concurrentSkipListSet.isEmpty()) {
				continue;
			}
			SMessage sm = concurrentSkipListSet.first();
			long sortId = sm.getSortId();
			long curNanoTime = System.nanoTime();
			final int k = Long.signum(sortId - index.get());
			switch (k) {
			case 0:
				if (checkSm(sm, curNanoTime, sortId)) {
					return sm;
				}
				break;
			case -1:
				if (concurrentSkipListSet.remove(sm)) {
					failBackUp(sm);
				}
				break;
			case 1:
				if ((curNanoTime - preOutputNanoTime.get()) < delayNanoTime
						.get()) {
					continue;
				} else {
					if (checkSm(sm, curNanoTime, sortId)) {
						return sm;
					}
				}
			}
		} while (true);
	}

	private boolean checkSm(SMessage sm, long curNanoTime, long sortId) {
		boolean b = concurrentSkipListSet.remove(sm);
		if (b) {
			preOutputNanoTime.set(curNanoTime);
			index.set(sortId + 1);
		}
		return b;
	}

	private void failBackUp(SMessage smessage) {

	}

	@Override
	public SMessage next(long timeout, TimeUnit unit) {
		return null;
	}

	private static class proxy {
		private static NoAwaitSortQueue noAwaitSortQueue = new NoAwaitSortQueue();
	}

	public static NoAwaitSortQueue newInstance() {
		return proxy.noAwaitSortQueue;
	}

	public static void main(String[] args) {
		new Thread(new Runnable() {

			@Override
			public void run() {
				for (long i = 0; true; i++) {
					SMessage sm = new SMessage();
					sm.setSortId(i);
					newInstance().put(sm);
				}
			}
		}).start();

		new Thread(new Runnable() {

			@Override
			public void run() {
				while (true) {
					SMessage sm = newInstance().next();
					logger.info(sm.getSortId());
				}
			}
		}).start();
	}

}

 

3
3
分享到:
评论
1 楼 zhangzhenjj 2013-09-12  
先自己顶一个!!!!!        

相关推荐

Global site tag (gtag.js) - Google Analytics