有锁实现方案:
package com.boco.sfmhandler.bolts.sender.sort; import java.io.Serializable; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import com.boco.sfmhandler.model.message.SMessage; /** * 只能单线程调用next * * @author boy */ public class WaitSortBlockingQueue implements SortQueue<SMessage> { private static final long serialVersionUID = -4793564853425866287L; private WaitSortBlockingQueue() { super(); } private final ReentrantLock lock = new ReentrantLock(false); private final Condition empty = lock.newCondition(); private final Condition full = lock.newCondition(); private final AtomicLong atomicLong = new AtomicLong(0); // 初始时拿不到最小的sm的id,所以让其超时出去,并获取该id,以后进行累加 private final TreeMap<Long, SMessage> map = new TreeMap<Long, SMessage>(); private final int maxBlockSize = 10000; private final TimeUnit delayNanoUnit = TimeUnit.NANOSECONDS; private final long delayNanoTime = delayNanoUnit.convert(1500L, TimeUnit.MILLISECONDS); // 1.5s /** * 线程私有 */ private ThreadLocal<Long> preOutputNanoTime = new ThreadLocal<Long>() { @Override protected Long initialValue() { return System.nanoTime(); } }; public SMessage next() { long curNanoTime = System.nanoTime(); final ReentrantLock lock = this.lock; SMessage sm = null; try { lock.lockInterruptibly(); emptyAwait(); /** * map 中保存的最小的sm的key */ long firstKey; while ((firstKey = map.firstKey()) != atomicLong.get() && ((curNanoTime = System.nanoTime()) - preOutputNanoTime .get()) < delayNanoTime) { /** * 如果firstkey < 当前累计位置,进行错误备份 这里对等待的时间没做任何修改 * 只当该sm是个过客,不影响其它sm的等待时间 */ if (firstKey < atomicLong.get()) { sm = map.remove(firstKey); full.signal(); failBackUp(sm); emptyAwait(); continue; } long awitTime = delayNanoTime - (curNanoTime - preOutputNanoTime.get()); empty.await(awitTime, delayNanoUnit); } sm = map.remove(firstKey); full.signal(); atomicLong.set(firstKey + 1); setPreOutputNanoTime(curNanoTime); } catch (InterruptedException e) { empty.signal(); e.printStackTrace(); } finally { lock.unlock(); } return sm; } /** * 如果是空,直接等待,put时会signal * * @throws InterruptedException */ private void emptyAwait() throws InterruptedException { while (map.isEmpty()) { empty.await(); } } /** * 错误sm备份 * * @param sm */ private void failBackUp(SMessage sm) { } private void setPreOutputNanoTime(long nanoTime) { preOutputNanoTime.set(nanoTime); } @Override public SMessage next(long timeout, TimeUnit unit) { return null; } @Override public void put(long key, SMessage sm) { final ReentrantLock lock = this.lock; try { lock.lockInterruptibly(); while (map.size() >= maxBlockSize) full.await(); map.put(key, sm); empty.signal(); // 唤醒在next上等待的线程 } catch (InterruptedException e) { full.signal(); e.printStackTrace(); } finally { lock.unlock(); } } @Override public void put(SMessage sm) { put(sm.getSortId(), sm); } private static class proxy implements Serializable { private static final long serialVersionUID = -3999942172367616131L; private final static WaitSortBlockingQueue waitSortBlockingQueue = new WaitSortBlockingQueue(); } public static WaitSortBlockingQueue newInstance() { return proxy.waitSortBlockingQueue; } }
无所实现方案:(这个跳跃表貌似频繁的插入、移除有点慢,有木有更好的非阻塞、排序链表实现?)
package com.boco.sfmhandler.bolts.sender.sort; import java.util.Comparator; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import com.boco.sfmhandler.model.message.SMessage; public class NoAwaitSortQueue implements SortQueue<SMessage> { private static final long serialVersionUID = -6596506016522782898L; private static final int maxSize = 10000; private static Logger logger = Logger.getLogger(NoAwaitSortQueue.class); /** * 线程私有 */ private ThreadLocal<Long> preOutputNanoTime = new ThreadLocal<Long>() { @Override protected Long initialValue() { return System.nanoTime(); } }; private ThreadLocal<Long> index = new ThreadLocal<Long>() { @Override protected Long initialValue() { return 0L; } }; private ThreadLocal<Long> delayNanoTime = new ThreadLocal<Long>() { @Override protected Long initialValue() { return TimeUnit.NANOSECONDS.convert(1500L, TimeUnit.MILLISECONDS); // 1.5s } }; private final ConcurrentSkipListSet<SMessage> concurrentSkipListSet = new ConcurrentSkipListSet<SMessage>( new Comparator<SMessage>() { @Override public int compare(SMessage o1, SMessage o2) { return Long.signum(o1.getSortId() - o2.getSortId()); } }); @Override public void put(SMessage value) { while (concurrentSkipListSet.size() >= maxSize) { //nothing } concurrentSkipListSet.add(value); } @Override public void put(long key, SMessage value) { put(value); } @Override public SMessage next() { do { if (concurrentSkipListSet.isEmpty()) { continue; } SMessage sm = concurrentSkipListSet.first(); long sortId = sm.getSortId(); long curNanoTime = System.nanoTime(); final int k = Long.signum(sortId - index.get()); switch (k) { case 0: if (checkSm(sm, curNanoTime, sortId)) { return sm; } break; case -1: if (concurrentSkipListSet.remove(sm)) { failBackUp(sm); } break; case 1: if ((curNanoTime - preOutputNanoTime.get()) < delayNanoTime .get()) { continue; } else { if (checkSm(sm, curNanoTime, sortId)) { return sm; } } } } while (true); } private boolean checkSm(SMessage sm, long curNanoTime, long sortId) { boolean b = concurrentSkipListSet.remove(sm); if (b) { preOutputNanoTime.set(curNanoTime); index.set(sortId + 1); } return b; } private void failBackUp(SMessage smessage) { } @Override public SMessage next(long timeout, TimeUnit unit) { return null; } private static class proxy { private static NoAwaitSortQueue noAwaitSortQueue = new NoAwaitSortQueue(); } public static NoAwaitSortQueue newInstance() { return proxy.noAwaitSortQueue; } public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { for (long i = 0; true; i++) { SMessage sm = new SMessage(); sm.setSortId(i); newInstance().put(sm); } } }).start(); new Thread(new Runnable() { @Override public void run() { while (true) { SMessage sm = newInstance().next(); logger.info(sm.getSortId()); } } }).start(); } }
相关推荐
一次XGBoost性能优化-超线程影响运算速度.doc
使用场景为一个线程写一个线程读完全不需要锁。可以设定buffer的初始块及数量,初始块是固定大小的,当需要扩环时会动态创建块即不像其它的库块满了就写失败了,当释放时会将动态创建的块还给系统,初始块还给自己的...
ICP算法加速优化--多线程和GPU已成功编译配置:Windows10下环境配置:cmake3.23.3+VS2019+CUDA11.1+PCL1.12.1Ubuntu20.04(WSL)下环境配置:基于docker镜像cuda11.1-gcc,cmake3.16.3+PCL1.10.0
多线程复制的另一种实现,不是分块随机读写多线程,而是一个线程读,一个线程写的copy
vc++ 多线程教程---线程通信--利用事件对象,线程同步--使用信号量,线程同步--使用互斥量,线程同步--使用临界区
实现一个数据单元,包括学号和姓名两部分。编写两个线程,一个线程往数据单元中写,另一个线程往出读。要求每写一次就往出读一次。
Android----线程实现图片移动 , 运用线程实现图片的水平循环往返移动!
人工智能-项目实践-多线程-一个多线程多进程的下载DEMO 人工智能-项目实践-多线程-一个多线程多进程的下载DEMO
基于java的开发源码-多线程程序死锁检查 JCarder.zip 基于java的开发源码-多线程程序死锁检查 JCarder.zip 基于java的开发源码-多线程程序死锁检查 JCarder.zip 基于java的开发源码-多线程程序死锁检查 JCarder.zip ...
操作系统实验报告-理解线程和请求分页存储管理设计.docx操作系统实验报告-理解线程和请求分页存储管理设计.docx操作系统实验报告-理解线程和请求分页存储管理设计.docx操作系统实验报告-理解线程和请求分页存储管理...
人工智能-项目实践-多线程-多线程爬虫--抓取淘宝商品详情页URL 本项目是一个Java编写的多线程爬虫系统。此系统与我之前开发的ip-proxy-pools-regularly结合使用,共抓取了淘宝近3000个页面,从中解析到了近9万的...
C++多线程一步步编程C++多线程一步步编程---多线程
EasyTcpServer-优化线程-跨平台兼容-utf8 bom字符编码
人工智能-项目实践-多线程-多线程与高并发 多线程与高并发
多线程编程-监控线程
计算机网络课程设计----多线程Web服务器 完美程序+完整的报告
多线程相关知识源码-----多线程案例源码
人工智能-项目实践-多线程-tonado的multi-thread 多线程封装 Quick Start 1.在“biz”目录中创建一个py文件,文件名任意但最好不要跟第三方库冲突 2.使用 "Router.route" 装饰器注册函数到路由表中,仿造示例即可 ...
3.1 认识线程 在操作系统中,一个任务对应一个进程,例如浏览网页启动一个浏览器进程,听音乐启动一个音乐进程等等,在进程中同样包含多个子任务,多个子任务间同时执行,例如音乐进程,一边有声音的播放,还有字幕...
如果需要实现一个线程或有效个线程负责对SocketChannel数据的读操作,为了保证线程安全,同一时间需要保证只有一个线程在负责读操作。即需要采用SelectionKey.cancel()从注册的Selector中取消对该Selection的监视,...