source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/ParallelHarvester.java @ 1140

Last change on this file since 1140 was 1140, checked in by oschonef, 13 years ago
  • make maximum network retry count configurable through Harvester
  • re-naming internal methods
  • Property svn:eol-style set to native
File size: 3.9 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.net.URI;
4import java.util.HashSet;
5import java.util.concurrent.DelayQueue;
6import java.util.concurrent.TimeUnit;
7import java.util.concurrent.atomic.AtomicBoolean;
8import java.util.concurrent.atomic.AtomicLong;
9
10import eu.clarin.cmdi.oai.harvester.HarvestHandler;
11import eu.clarin.cmdi.oai.harvester.HarvestJob;
12import eu.clarin.cmdi.oai.harvester.Harvester;
13import eu.clarin.cmdi.oai.harvester.HarvesterException;
14
15public class ParallelHarvester extends AbstractHarvester {
16    private final class WorkerThread extends Thread {
17        private final HarvestWorker worker =
18            new HarvestWorker(ParallelHarvester.this);
19        private AtomicBoolean loop = new AtomicBoolean(true);
20       
21        @Override
22        public void run() {
23            while (loop.get()) {
24                try {
25                    System.err.println(getId() + " waiting ...");
26                    HarvestJobImpl job = queue.take();
27                    System.err.println(getId() + " got job " + job.getId());
28                    try {
29                        while (true) {
30                            System.err.println(getId() + " process job " +
31                                    job.getId());
32                            worker.process(job, maxNetworkRetryCount);
33                            if (job.isRunning()) {
34                                if (job.getDelay(TimeUnit.MILLISECONDS) > 0) {
35                                    queue.offer(job);
36                                    System.err.println(getId() +
37                                            " requeuing job " + job.getId());
38                                } else {
39                                    continue;
40                                }
41                            }
42                            break;
43                        } // while
44                    } catch (HarvesterException e) {
45                        e.printStackTrace();
46                    }
47                } catch (InterruptedException e) {
48                    /* IGNORE */
49                }
50            } // while
51            System.err.println(getId() + " ... exit");
52        }
53
54        void shutdown() {
55            loop.set(false);
56            this.interrupt();
57        }
58    }
59
60    private final HashSet<WorkerThread> workers = new HashSet<WorkerThread>();
61    private final DelayQueue<HarvestJobImpl> queue = new DelayQueue<HarvestJobImpl>();
62    private AtomicLong nextId = new AtomicLong();
63
64    private ParallelHarvester() {
65        for (int i = 0; i < 4; i++) {
66            workers.add(new WorkerThread());
67        }
68    }
69
70    @Override
71    public HarvestJob createJob(URI repositoryURI, HarvestHandler handler)
72            throws HarvesterException {
73        return new HarvestJobImpl(this, nextId.incrementAndGet(),
74                repositoryURI, handler);
75    }
76
77    public static Harvester newInstance() {
78        ParallelHarvester harvester = new ParallelHarvester();
79        for (WorkerThread w : harvester.workers) {
80            w.start();
81        }
82        return harvester;
83    }
84
85    @Override
86    public void shutdown() {
87        for (WorkerThread w : workers) {
88            w.shutdown();
89            try {
90                w.join();
91            } catch (InterruptedException e) {
92                System.err.println("join fail!");
93            }
94        }
95    }
96
97
98    @Override
99    void doRunJob(HarvestJobImpl job) throws HarvesterException {
100        job.setState(HarvestJob.State.RUNNING);
101        queue.offer(job);
102    }
103
104    @Override
105    void doCancelJob(HarvestJobImpl job) throws HarvesterException {
106        queue.remove(job);
107    }
108
109    @Override
110    protected HarvestJob doFindJob(String id) throws HarvesterException {
111        for (HarvestJobImpl job : queue) {
112            if (id.equals(job.getId())) {
113                queue.remove(job);
114                return job;
115            }
116        }
117        return null;
118    }
119
120} // class ParallelHarvester
Note: See TracBrowser for help on using the repository browser.