source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/ParallelHarvester.java @ 3011

Last change on this file since 3011 was 3011, checked in by oschonef, 11 years ago
  • update Woodstox, SLF4J and Apache Commons HTTP client dependencies
  • several interoperability fixes (including relaxed behavior)
  • several bug fixes
  • Property svn:eol-style set to native
File size: 4.1 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.net.URI;
4import java.util.HashSet;
5import java.util.concurrent.DelayQueue;
6import java.util.concurrent.TimeUnit;
7import java.util.concurrent.atomic.AtomicBoolean;
8import java.util.concurrent.atomic.AtomicLong;
9
10import org.slf4j.Logger;
11import org.slf4j.LoggerFactory;
12
13import eu.clarin.cmdi.oai.harvester.HarvestHandler;
14import eu.clarin.cmdi.oai.harvester.HarvestJob;
15import eu.clarin.cmdi.oai.harvester.Harvester;
16import eu.clarin.cmdi.oai.harvester.HarvesterException;
17
18
19public class ParallelHarvester extends AbstractHarvester {
20    private final class WorkerThread extends Thread {
21        private final HarvestWorker worker = new HarvestWorker(
22                ParallelHarvester.this);
23        private AtomicBoolean loop = new AtomicBoolean(true);
24
25
26        @Override
27        public void run() {
28            while (loop.get()) {
29                try {
30                    logger.debug("[{}] waiting ...", getId());
31                    final HarvestJobImpl job = queue.take();
32                    logger.debug("[{}] got job {}", getId(), job.getId());
33                    try {
34                        while (true) {
35                            logger.debug("[{}] processing job {} ...",
36                                    getId(), job.getId());
37                            worker.process(job, maxNetworkRetryCount);
38                            if (job.isRunning()) {
39                                if (job.getDelay(TimeUnit.MILLISECONDS) > 0) {
40                                    queue.offer(job);
41                                    logger.debug("[{}] requeuing job {} ...",
42                                            getId(), job.getId());
43                                } else {
44                                    continue;
45                                }
46                            }
47                            break;
48                        } // while
49                    } catch (HarvesterException e) {
50                        e.printStackTrace();
51                    }
52                } catch (InterruptedException e) {
53                    /* IGNORE */
54                }
55            } // while
56            logger.debug("[{}] exiting ...", getId());
57        }
58
59
60        void shutdown() {
61            loop.set(false);
62            this.interrupt();
63        }
64    }
65    private static final Logger logger =
66            LoggerFactory.getLogger(ParallelHarvester.class);
67    private final HashSet<WorkerThread> workers =
68            new HashSet<WorkerThread>();
69    private final DelayQueue<HarvestJobImpl> queue =
70            new DelayQueue<HarvestJobImpl>();
71    private final AtomicLong nextId = new AtomicLong(0);
72
73
74    private ParallelHarvester() {
75        for (int i = 0; i < 4; i++) {
76            workers.add(new WorkerThread());
77        }
78    }
79
80
81    @Override
82    public HarvestJob createJob(URI repositoryURI, HarvestHandler handler)
83            throws HarvesterException {
84        return new HarvestJobImpl(this, nextId.incrementAndGet(),
85                repositoryURI, handler);
86    }
87
88
89    public static Harvester newInstance() {
90        ParallelHarvester harvester = new ParallelHarvester();
91        for (WorkerThread w : harvester.workers) {
92            w.start();
93        }
94        return harvester;
95    }
96
97
98    @Override
99    public void shutdown() {
100        for (WorkerThread w : workers) {
101            w.shutdown();
102            try {
103                w.join();
104            } catch (InterruptedException e) {
105                logger.error("interrupted while shutting down");
106            }
107        }
108    }
109
110
111    @Override
112    void doRunJob(HarvestJobImpl job) throws HarvesterException {
113        job.setState(HarvestJob.State.RUNNING);
114        queue.offer(job);
115    }
116
117
118    @Override
119    void doCancelJob(HarvestJobImpl job) throws HarvesterException {
120        queue.remove(job);
121    }
122
123
124    @Override
125    protected HarvestJob doFindJob(long id) throws HarvesterException {
126        for (HarvestJobImpl job : queue) {
127            if (id == job.getId()) {
128                queue.remove(job);
129                return job;
130            }
131        }
132        return null;
133    }
134
135} // class ParallelHarvester
Note: See TracBrowser for help on using the repository browser.