澳门新葡11599_www.11599.com_澳门新萄京娱乐11599平台

java中通用的线程池实例代码,java线程池实例澳门

日期:2019-07-23编辑作者:澳门新葡11599

java使用默认线程池踩过的坑(1)

场景

一个调度器,两个调度任务,分别处理两个目录下的txt文件,某个调度任务应对某些复杂问题的时候会持续特别长的时间,甚至有一直阻塞的可能。我们需要一个manager来管理这些task,当这个task的上一次执行时间距离现在超过5个调度周期的时候,就直接停掉这个线程,然后再重启它,保证两个目标目录下没有待处理的txt文件堆积。

问题

直接使用java默认的线程池调度task1和task2.由于外部txt的种种不可控原因,导致task2线程阻塞。现象就是task1和线程池调度器都正常运行着,但是task2迟迟没有动作。

当然,找到具体的阻塞原因并进行针对性解决是很重要的。但是,这种措施很可能并不能完全、彻底、全面的处理好所有未知情况。我们需要保证任务线程或者调度器的健壮性!

方案计划

线程池调度器并没有原生的针对被调度线程的业务运行状态进行监控处理的API。因为task2是阻塞在我们的业务逻辑里的,所以最好的方式是写一个TaskManager,所有的任务线程在执行任务前全部到这个TaskManager这里来注册自己。这个TaskManager就负责对于每个自己管辖范围内的task进行实时全程监控!

后面的重点就是如何处理超过5个执行周期的task了。

方案如下:

●一旦发现这个task线程,立即中止它,然后再次重启;

●一旦发现这个task线程,直接将整个pool清空并停止,重新放入这两个task ——task明确的情况下】;

方案实施

中止后重启

●Task实现类

class FileTask extends Thread { private long lastExecTime = 0; protected long interval = 10000; public long getLastExecTime() {     return lastExecTime; } public void setLastExecTime(long lastExecTime) {     this.lastExecTime = lastExecTime; } public long getInterval() {     return interval; } public void setInterval(long interval) {     this.interval = interval; }  public File[] getFiles() {     return null; } 

●Override

public void run() { while (!Thread.currentThread().isInterrupted()) { lastExecTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName()   " is running -> "   new Date()); try { Thread.sleep(getInterval() * 6 * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); e.printStackTrace();    // 当线程池shutdown之后,这里就会抛出exception了             }         }     }     } 

●TaskManager

public class TaskManager  implements Runnable { private final static Log logger = LogFactory.getLog(TaskManager .class); public Set<FileTask> runners = new CopyOnWriteArraySet<FileTask>(); ExecutorService pool = Executors.newCachedThreadPool(); public void registerCodeRunnable(FileTask process) { runners.add(process); } public TaskManager (Set<FileTask> runners) { this.runners = runners; } 

@Override

public void run() {        while (!Thread.currentThread().isInterrupted()) {            try {                long current = System.currentTimeMillis();                for (FileTask wrapper : runners) {                    if (current - wrapper.getLastExecTime() > wrapper.getInterval() * 5) {                        wrapper.interrupt();                        for (File file : wrapper.getFiles()) {                            file.delete();                        }                     wrapper.start();                      }                }            } catch (Exception e1) {                logger.error("Error happens when we trying to interrupt and restart a task ");                ExceptionCollector.registerException(e1);            }            try {                Thread.sleep(500);            } catch (InterruptedException e) {            }        }    }     

这段代码会报错 java.lang.Thread IllegalThreadStateException。为什么呢?其实这是一个很基础的问题,您应该不会像我一样马虎。查看Thread.start()的注释, 有这样一段:

It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution.

是的,一个线程不能够启动两次。那么它是怎么判断的呢?

public synchronized void start() {         /**          * A zero status value corresponds to state "NEW".    0对应的是state NEW          */ 

if (threadStatus != 0) //如果不是NEW state,就直接抛出异常!


澳门新葡11599 1


) 场景 一个调度器,两个调度任务,分别处理两个目录下的txt文件,某个调度任务应对某些复杂问题的时候会...

java中通用的线程池实例代码,java线程池实例

复制代码 代码如下:
package com.smart.frame.task.autoTask;

import java.util.Collection;
import java.util.Vector;

/**
 * 任务分发器
 */
public class TaskManage extends Thread
{
    protected Vector<Runnable> tasks = new Vector<Runnable>();
    protected boolean running = false;
    protected boolean stopped = false;
    protected boolean paused = false;
    protected boolean killed = false;
    private ThreadPool pool;

    public TaskManage(ThreadPool pool)
    {
        this.pool = pool;
    }

    public void putTask(Runnable task)
    {
        tasks.add(task);
    }

    public void putTasks(Runnable[] tasks)
    {
        for (int i = 0; i < tasks.length; i )
            this.tasks.add(tasks[i]);
    }

    public void putTasks(Collection<Runnable> tasks)
    {
        this.tasks.addAll(tasks);
    }

    protected Runnable popTask()
    {
        if (tasks.size() > 0) return (Runnable) tasks.remove(0);
        else return null;
    }

    public boolean isRunning()
    {
        return running;
    }

    public void stopTasks()
    {
        stopped = true;
    }

    public void stopTasksSync()
    {
        stopTasks();
        while (isRunning())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public void pauseTasks()
    {
        paused = true;
    }

    public void pauseTasksSync()
    {
        pauseTasks();
        while (isRunning())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public void kill()
    {
        if (!running) interrupt();
        else killed = true;
    }

    public void killSync()
    {
        kill();
        while (isAlive())
        {
            try
            {
                sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
            }
        }
    }

    public synchronized void startTasks()
    {
        running = true;
        this.notify();
    }

    public synchronized void run()
    {
        try
        {
            while (true)
            {
                if (!running || tasks.size() == 0)
                {
                    pool.notifyForIdleThread();
                    this.wait();
                }
                else
                {
                    Runnable task;
                    while ((task = popTask()) != null)
                    {
                        task.run();
                        if (stopped)
                        {
                            stopped = false;
                            if (tasks.size() > 0)
                            {
                                tasks.clear();
                                System.out.println(Thread.currentThread().getId() ": Tasks are stopped");
                                break;
                            }
                        }
                        if (paused)
                        {
                            paused = false;
                            if (tasks.size() > 0)
                            {
                                System.out.println(Thread.currentThread().getId() ": Tasks are paused");
                                break;
                            }
                        }
                    }
                    running = false;
                }

                if (killed)
                {
                    killed = false;
                    break;
                }
            }
        }
        catch (InterruptedException e)
        {
            TaskException.getResultMessage(e);
            return;
        }
    }
}

复制代码 代码如下:
package com.smart.frame.task.autoTask;

import java.util.Collection;
import java.util.Iterator;
import java.util.Vector;

/**
 * 线程池
 */
public class ThreadPool
{
    protected int maxPoolSize = TaskConfig.maxPoolSize;
    protected int initPoolSize = TaskConfig.initPoolSize;
    protected Vector<TaskManage> threads = new Vector<TaskManage>();
    protected boolean initialized = false;
    protected boolean hasIdleThread = false;

    public ThreadPool()
    {
        super();
    }

    public ThreadPool(int maxPoolSize, int initPoolSize)
    {
        this.maxPoolSize = maxPoolSize;
        this.initPoolSize = initPoolSize;
    }

    public void init()
    {
        initialized = true;
        for (int i = 0; i < initPoolSize; i )
        {
            TaskManage thread = new TaskManage(this);
            thread.start();
            threads.add(thread);
        }
    }

    public void setMaxPoolSize(int maxPoolSize)
    {
        this.maxPoolSize = maxPoolSize;
        if (maxPoolSize < getPoolSize()) setPoolSize(maxPoolSize);
    }

    /**
     * 重设当前线程数 若需杀掉某线程,线程不会立刻杀掉,而会等到线程中的事
     * 务处理完成 但此方法会立刻从线程池中移除该线程,不会等待事务处理结束
     */
    public void setPoolSize(int size)
    {
        if (!initialized)
        {
            initPoolSize = size;
            return;
        }
        else if (size > getPoolSize())
        {
            for (int i = getPoolSize(); i < size && i < maxPoolSize; i )
            {
                TaskManage thread = new TaskManage(this);
                thread.start();
                threads.add(thread);
            }
        }
        else if (size < getPoolSize())
        {
            while (getPoolSize() > size)
            {
                TaskManage th = (TaskManage) threads.remove(0);
                th.kill();
            }
        }
    }

    public int getPoolSize()
    {
        return threads.size();
    }

    protected void notifyForIdleThread()
    {
        hasIdleThread = true;
    }

    protected boolean waitForIdleThread()
    {
        hasIdleThread = false;
        while (!hasIdleThread && getPoolSize() >= maxPoolSize)
        {
            try
            {
                Thread.sleep(5);
            }
            catch (InterruptedException e)
            {
                TaskException.getResultMessage(e);
                return false;
            }
        }

        return true;
    }

    public synchronized TaskManage getIdleThread()
    {
        while (true)
        {
            for (Iterator<TaskManage> itr = threads.iterator(); itr.hasNext();)
            {
                TaskManage th = (TaskManage) itr.next();
                if (!th.isRunning()) return th;
            }

            if (getPoolSize() < maxPoolSize)
            {
                TaskManage thread = new TaskManage(this);
                thread.start();
                threads.add(thread);
                return thread;
            }

            if (waitForIdleThread() == false) return null;
        }
    }

    public void processTask(Runnable task)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTask(task);
            th.startTasks();
        }
    }

    public void processTasksInSingleThread(Runnable[] tasks)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTasks(tasks);
            th.startTasks();
        }
    }

    public void processTasksInSingleThread(Collection<Runnable> tasks)
    {
        TaskManage th = getIdleThread();
        if (th != null)
        {
            th.putTasks(tasks);
            th.startTasks();
        }
    }

}

复制代码 代码如下:
package com.smart.frame.task.autoTask;

public class TopTask implements Runnable
{

    private ThreadPool pool;

    public TopTask()
    {
        super();
    }

    public TopTask(ThreadPool pool)
    {
        super();
        this.pool = pool;
    }

    @Override
    public void run()
    {
        init();
        start();
    }

    /**
     * 初始化验证权限、参数之类
     */
    public void init()
    {

    }

    /**
     * 开始自动任务
     */
    public void start()
    {
        for (int i = 0; i < 10; i )
        {
            pool.processTask(new BeginAuto());
        }
    }
}
/**
 * 实现类
 */
class BeginAuto implements Runnable
{
    @Override
    public void run()
    {
        System.out.println(Thread.currentThread().getId() "..................");
    }

}

复制代码 代码如下: package com.smart.frame.task.autoTask; import java.util.Collection; import java.util.Vector;...

在Java中有两类线程:User Thread(用户线程)、Daemon Thread(守护线程) 

用个比较通俗的比如,任何一个守护线程都是整个JVM中所有非守护线程的保姆:

只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作。
Daemon的作用是为其他线程的运行提供便利服务,守护线程最典型的应用就是 GC (垃圾回收器),它就是一个很称职的守护者。

User和Daemon两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果 User Thread已经全部退出运行了,只剩下Daemon Thread存在了,虚拟机也就退出了。 因为没有了被守护者,Daemon也就没有工作可做了,也就没有继续运行程序的必要了。

值得一提的是,守护线程并非只有虚拟机内部提供,用户在编写程序时也可以自己设置守护线程。下面的方法就是用来设置守护线程的。 

[java] view plain copy

 

  1. Thread daemonTread = new Thread();  
  2.    
  3.   // 设定 daemonThread 为 守护线程,default false(非守护线程)  
  4.  daemonThread.setDaemon(true);  
  5.    
  6.  // 验证当前线程是否为守护线程,返回 true 则为守护线程  
  7.  daemonThread.isDaemon();  

这里有几点需要注意: 

(1) thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。
(2) 在Daemon线程中产生的新线程也是Daemon的。 
(3) 不要认为所有的应用都可以分配给Daemon来进行服务,比如读写操作或者计算逻辑。 

因为你不可能知道在所有的User完成之前,Daemon是否已经完成了预期的服务任务。一旦User退出了,可能大量数据还没有来得及读入或写出,计算任务也可能多次运行结果不一样。这对程序是毁灭性的。造成这个结果理由已经说过了:一旦所有User Thread离开了,虚拟机也就退出运行了。 

 

[java] view plain copy

 

  1. //完成文件输出的守护线程任务  
  2. import java.io.*;     
  3.     
  4. class TestRunnable implements Runnable{     
  5.     public void run(){     
  6.                try{     
  7.                   Thread.sleep(1000);//守护线程阻塞1秒后运行     
  8.                   File f=new File("daemon.txt");     
  9.                   FileOutputStream os=new FileOutputStream(f,true);     
  10.                   os.write("daemon".getBytes());     
  11.            }     
  12.                catch(IOException e1){     
  13.           e1.printStackTrace();     
  14.                }     
  15.                catch(InterruptedException e2){     
  16.                   e2.printStackTrace();     
  17.            }     
  18.     }     
  19. }     
  20. public class TestDemo2{     
  21.     public static void main(String[] args) throws InterruptedException     
  22.     {     
  23.         Runnable tr=new TestRunnable();     
  24.         Thread thread=new Thread(tr);     
  25.                 thread.setDaemon(true); //设置守护线程     
  26.         thread.start(); //开始执行分进程     
  27.     }     
  28. }     
  29. //运行结果:文件daemon.txt中没有"daemon"字符串。  

看到了吧,把输入输出逻辑包装进守护线程多么的可怕,字符串并没有写入指定文件。原因也很简单,直到主线程完成,守护线程仍处于1秒的阻塞状态。这个时候主线程很快就运行完了,虚拟机退出,Daemon停止服务,输出操作自然失败了。

 

 

[java] view plain copy

 

  1. public class Test {  
  2.   public static void main(String args) {  
  3.   Thread t1 = new MyCommon();  
  4.   Thread t2 = new Thread(new MyDaemon());  
  5.   t2.setDaemon(true); //设置为守护线程  
  6.   t2.start();  
  7.   t1.start();  
  8.   }  
  9.   }  
  10.   class MyCommon extends Thread {  
  11.   public void run() {  
  12.   for (int i = 0; i < 5; i ) {  
  13.   System.out.println("线程1第"   i   "次执行!");  
  14.   try {  
  15.   Thread.sleep(7);  
  16.   } catch (InterruptedException e) {  
  17.   e.printStackTrace();  
  18.   }  
  19.   }  
  20.   }  
  21.   }  

 

[html] view plain copy

 

  1. class MyDaemon implements Runnable {  
  2.   public void run() {  
  3.   for (long i = 0; i < 9999999L; i ) {  
  4.   System.out.println("后台线程第"   i   "次执行!");  
  5.   try {  
  6.   Thread.sleep(7);  
  7.   } catch (InterruptedException e) {  
  8.   e.printStackTrace();  
  9.   }  
  10.   }  
  11.   }  
  12.   }  

后台线程第0次执行!
  线程1第0次执行! 
  线程1第1次执行! 
  后台线程第1次执行! 
  后台线程第2次执行! 
  线程1第2次执行! 
  线程1第3次执行! 
  后台线程第3次执行! 
  线程1第4次执行! 
  后台线程第4次执行! 
  后台线程第5次执行! 
  后台线程第6次执行! 
  后台线程第7次执行! 
  Process finished with exit code 0 
  从上面的执行结果可以看出: 
  前台线程是保证执行完毕的,后台线程还没有执行完毕就退出了。 

  实际上:JRE判断程序是否执行结束的标准是所有的前台执线程行完毕了,而不管后台线程的状态,因此,在使用后台县城时候一定要注意这个问题。 

补充说明:
定义:守护线程--也称“服务线程”,在没有用户线程可服务时会自动离开。
优先级:守护线程的优先级比较低,用于为系统中的其它对象和线程提供服务。
设置:通过setDaemon(true)来设置线程为“守护线程”;将一个用户线程设置为
守护线程的方式是在 线程对象创建 之前 用线程对象的setDaemon方法。
example: 垃圾回收线程就是一个经典的守护线程,当我们的程序中不再有任何运行的
Thread,程序就不会再产生垃圾,垃圾回收器也就无事可做,所以当垃圾回收线程是
JVM上仅剩的线程时,垃圾回收线程会自动离开。它始终在低级别的状态中运行,用于
实时监控和管理系统中的可回收资源。
生命周期:守护进程(Daemon)是运行在后台的一种特殊进程。它独立于控制终端并且
周期性地执行某种任务或等待处理某些发生的事件。也就是
说守护线程不依赖于终端,但是依赖于系统,与系统“同生共死”。那Java的守护线程是
什么样子的呢。当JVM中所有的线程都是守护线程的时候,JVM就可以退出了;如果还有一个
或以上的非守护线程则JVM不会退出。

实际应用例子:

在使用长连接的comet服务端推送技术中,消息推送线程设置为守护线程,服务于ChatServlet的servlet用户线程,在servlet的init启动消息线程,servlet一旦初始化后,一直存在服务器,servlet摧毁后,消息线程自动退出

容器收到一个Servlet请求,调度线程从线程池中选出一个工作者线程,将请求传递给该工作者线程,然后由该线程来执行Servlet的 service方法。当这个线程正在执行的时候,容器收到另外一个请求,调度线程同样从线程池中选出另一个工作者线程来服务新的请求,容器并不关心这个请求是否访问的是同一个Servlet.当容器同时收到对同一个Servlet的多个请求的时候,那么这个Servlet的service()方法将在多线程中并发执行。
Servlet容器默认采用单实例多线程的方式来处理请求,这样减少产生Servlet实例的开销,提升了对请求的响应时间,对于Tomcat可以在server.xml中通过<Connector>元素设置线程池中线程的数目。
如图: 

澳门新葡11599 2 

为什么要用守护线程?

 

我们知道静态变量是ClassLoader级别的,如果Web应用程序停止,这些静态变量也会从JVM中清除。但是线程则是JVM级别的,如果你在Web 应用中启动一个线程,这个线程的生命周期并不会和Web应用程序保持同步。也就是说,即使你停止了Web应用,这个线程依旧是活跃的。正是因为这个很隐晦 的问题,所以很多有经验的开发者不太赞成在Web应用中私自启动线程。

如果我们手工使用JDK Timer(Quartz的Scheduler),在Web容器启动时启动Timer,当Web容器关闭时,除非你手工关闭这个Timer,否则Timer中的任务还会继续运行!

下面通过一个小例子来演示这个“诡异”的现象,我们通过ServletContextListener在Web容器启动时创建一个Timer并周期性地运行一个任务:  

[java] view plain copy

 

  1. //代码清单StartCycleRunTask:容器监听器  
  2. package com.baobaotao.web;  
  3. import java.util.Date;  
  4. import java.util.Timer;  
  5. import java.util.TimerTask;  
  6. import javax.servlet.ServletContextEvent;  
  7. import javax.servlet.ServletContextListener;  
  8. public class StartCycleRunTask implements ServletContextListener ...{  
  9.     private Timer timer;  
  10.     public void contextDestroyed(ServletContextEvent arg0) ...{  
  11.         // ②该方法在Web容器关闭时执行  
  12.         System.out.println("Web应用程序启动关闭...");  
  13.     }  
  14.     public void contextInitialized(ServletContextEvent arg0) ...{  
  15.          //②在Web容器启动时自动执行该方法  
  16.         System.out.println("Web应用程序启动...");  
  17.         timer = new Timer();//②-1:创建一个Timer,Timer内部自动创建一个背景线程  
  18.         TimerTask task = new SimpleTimerTask();  
  19.         timer.schedule(task, 1000L, 5000L); //②-2:注册一个5秒钟运行一次的任务  
  20.     }  
  21. }  
  22. class SimpleTimerTask extends TimerTask ...{//③任务  
  23.     private int count;  
  24.     public void run() ...{  
  25.         System.out.println(( count) "execute task..." (new Date()));  
  26.     }  
  27. }  

在web.xml中声明这个Web容器监听器:<?xml version="1.0" encoding="UTF-8"?>
<web-app> 
… 
<listener> 
<listener-class>com.baobaotao.web.StartCycleRunTask</listener-class> 
</listener> 
</web-app> 

在Tomcat中部署这个Web应用并启动后,你将看到任务每隔5秒钟执行一次。 
运行一段时间后,登录Tomcat管理后台,将对应的Web应用(chapter13)关闭。 

转到Tomcat控制台,你将看到虽然Web应用已经关闭,但Timer任务还在我行我素地执行如故——舞台已经拆除,戏子继续表演: 

我们可以通过改变清单StartCycleRunTask的代码,在contextDestroyed(ServletContextEvent arg0)中添加timer.cancel()代码,在Web容器关闭后手工停止Timer来结束任务。

Spring为JDK Timer和Quartz Scheduler所提供的TimerFactoryBean和SchedulerFactoryBean能够和Spring容器的生命周期关联,在 Spring容器启动时启动调度器,而在Spring容器关闭时,停止调度器。所以在Spring中通过这两个FactoryBean配置调度器,再从 Spring IoC中获取调度器引用进行任务调度将不会出现这种Web容器关闭而任务依然运行的问题。而如果你在程序中直接使用Timer或Scheduler,如不 进行额外的处理,将会出现这一问题。 

本文由澳门新葡11599发布于澳门新葡11599,转载请注明出处:java中通用的线程池实例代码,java线程池实例澳门

关键词: 澳门新葡11599

ITSM的价值体现澳门新葡11599

ITSM的由来 ITSM的由来 ITSMIT ServiceManagement,IT服务管理)是一套扶助公司对IT系统的安排性、研究开发、施行和平运动营...

详细>>

用户对BSM所陷入的误区澳门新葡11599

认清需求为企业真正所用 因此,从以上三方面可以看出,要实现BSM的核心观念,就是必须同业务部门有交互,北塔软...

详细>>

分享管理不同操作系统的两种方式澳门新葡1159

分享管理差别操作系统的三种方法 数量宗旨的管理人能够经过各样工具和手艺的结合,在单个系统就能够达成对多少...

详细>>

BSM:不再是空中楼阁【澳门新葡11599】

IT运营管理在中原业已具有多年的进步历史,从最初的互联网管理到近年来的IT服务管理,越发伟大的对象是职业服务...

详细>>