本文共 2276 字,大约阅读时间需要 7 分钟。
最近很长时间没更新博客了,主要是最近项目上线,加班比较多,同时自己还得拿出一部分时间在网上找找资料学学新东西,也就没有额外的时间写博客了。
好了 进入正题,这篇博客主要是记录我在项目上线期间,由于项目上线之后需要批量获取现场终端设备状态(终端设备数为1500万左右,页面上点的话不能满足需求,一次只能召测数百),因此我用公司原有的一套代码去实现这个召测任务——由于代码我不可能贴出来,且我主要是分享我改造的并发思路,因此我将自己编写一些事例或伪代码用于帮助读者理解!
这套代码主要包含两个方法,一个就是发起召测的send方法,用于告诉终端我要你的数据,你得给我返回来并放到我指定的缓存中存着,send方法返回一个token作为下一个方法获取结果的依据(该token是异步返回的,这样会提高代码运行效率)。第二个方法为getResult,即根据send方法返回的token获取缓存中的结果。
第一天,由于是试召,公司要求召测的数量只有30万不到,所有召测必须在2小时之内执行完毕,2小时即为执行时间上限。第一天任务很简单,直接用公司原有的代码就搞定。
第二天,数量增加到50万,此时我便发现,两小时好像执行不完了——由于设计硬件设备,所以这里并不能简单的以为就是发50万数据便了,还需要考虑召测的成功率,终端响应极限等其它因数。这个时候我便开始看源码,发现这段代码一共开启了两个线程,一个线程用于执行send,另一个线程用于执行get方法,从他的代码我看出来,他的本意是两个线程共用同一把锁,通过wait和notify的通信机制,控制send方法每执行5000次,并将当前线程阻塞,并唤醒另一个执行get方法的线程,当get执行完成又唤醒执行send方法线程,不过由于代码中将字符串常量作为了锁对象,因此实际上加锁失败,然而原代码中便是通过sleep()代替了锁机制,即每执行5000条线程便会睡眠固定时间,然而事实上是,方法执行的时间其实并不固定,大部分根本等不了这么长时间,因此浪费了大量时间,
原代码:
更改前伪代码:线程A { run(){ while(true){ String token =send(); arryList.add(token); if(sendTimes == 5000 ){ Thread.sleep(1000*n); } } }}线程B { run(){ while(true){ if(arryList 为空 ){ Thread.sleep(1000*n) } } }}更改之后伪代码:锁使用同一把类锁 创建一个类Clazz线程A { run(){ while(true){ synchronized(Clazz.class){ String token =send(); arryList.add(token); if(sendTimes == 5000 ){ // Thread.sleep(1000*n); Clazz.class.notify(); Clazz.class.wait(); } } } }}线程B { run(){ while(true){ synchronized(Clazz.class){ if(当 arryList 为空 ){ //Thread.sleep(1000*n) Clazz.class.notify(); Clazz.class.wait(); } } } }}
第三天,需要对所有终端实现两小时内所有数据的召测,这个时候不用想,两个线程绝对不够,因此我打算开多线程了,由于send的返回值需要作为get方法的查询依据 ,因此我将send方法返回的token存到了一个并发阻塞队列LinkedBlockingQueue中,使用生产消费模式实现。
ConcurrentLinkedQueuequeue= new ConcurrentLinkedQueue ();线程A { run(){ while(true){ synchronized(Clazz.class){ if : 当无再召测的终端时countDownLatch.countDown()并break退出循环 else : String token =send(); 将token保存到队列中queue.put(token); } } }}线程B { run(){ while(true){ String token = queue.take(); if : 当token为空时countDownLatch.countDown()并break退出循环 执行获取结果方法get(token); } }}//主方法的伪代码main(){ 服务器有16个可用线程,主方法通过countDownLatch用于判断线程是否执行,可以通过结束时间判断并输出任务执行时间等 for(循环16次){ 线程A.start() 线程B.start() } 当任务还在执行的时候主线程阻塞在此处:this.countDownLatch.await(); 关闭资源,统计任务执行时间等后续操作 }
转载地址:http://gscii.baihongyu.com/