Changeset 3011 for OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/ParallelHarvester.java
- Timestamp:
- 06/10/13 19:50:23 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/ParallelHarvester.java
r1148 r3011 8 8 import java.util.concurrent.atomic.AtomicLong; 9 9 10 import org.slf4j.Logger; 11 import org.slf4j.LoggerFactory; 12 10 13 import eu.clarin.cmdi.oai.harvester.HarvestHandler; 11 14 import eu.clarin.cmdi.oai.harvester.HarvestJob; … … 13 16 import eu.clarin.cmdi.oai.harvester.HarvesterException; 14 17 18 15 19 public class ParallelHarvester extends AbstractHarvester { 16 20 private final class WorkerThread extends Thread { 17 private final HarvestWorker worker = 18 new HarvestWorker(ParallelHarvester.this);21 private final HarvestWorker worker = new HarvestWorker( 22 ParallelHarvester.this); 19 23 private AtomicBoolean loop = new AtomicBoolean(true); 20 24 25 21 26 @Override 22 27 public void run() { 23 28 while (loop.get()) { 24 29 try { 25 System.err.println(getId() + " waiting ...");26 HarvestJobImpl job = queue.take();27 System.err.println(getId() + " got job " +job.getId());30 logger.debug("[{}] waiting ...", getId()); 31 final HarvestJobImpl job = queue.take(); 32 logger.debug("[{}] got job {}", getId(), job.getId()); 28 33 try { 29 34 while (true) { 30 System.err.println(getId() + " process job " +31 job.getId());35 logger.debug("[{}] processing job {} ...", 36 getId(), job.getId()); 32 37 worker.process(job, maxNetworkRetryCount); 33 38 if (job.isRunning()) { 34 39 if (job.getDelay(TimeUnit.MILLISECONDS) > 0) { 35 40 queue.offer(job); 36 System.err.println(getId() +37 " requeuing job " +job.getId());41 logger.debug("[{}] requeuing job {} ...", 42 getId(), job.getId()); 38 43 } else { 39 44 continue; … … 49 54 } 50 55 } // while 51 System.err.println(getId() + " ... exit");56 logger.debug("[{}] exiting ...", getId()); 52 57 } 58 53 59 54 60 void shutdown() { … … 57 63 } 58 64 } 65 private static final Logger logger = 66 LoggerFactory.getLogger(ParallelHarvester.class); 67 private final HashSet<WorkerThread> workers = 68 new HashSet<WorkerThread>(); 69 private final DelayQueue<HarvestJobImpl> queue = 70 new DelayQueue<HarvestJobImpl>(); 71 private final AtomicLong nextId = new AtomicLong(0); 59 72 60 private final HashSet<WorkerThread> workers = new HashSet<WorkerThread>();61 private final DelayQueue<HarvestJobImpl> queue = new DelayQueue<HarvestJobImpl>();62 private AtomicLong nextId = new AtomicLong();63 73 64 74 private ParallelHarvester() { … … 68 78 } 69 79 80 70 81 @Override 71 82 public HarvestJob createJob(URI repositoryURI, HarvestHandler handler) … … 74 85 repositoryURI, handler); 75 86 } 87 76 88 77 89 public static Harvester newInstance() { … … 83 95 } 84 96 97 85 98 @Override 86 99 public void shutdown() { … … 90 103 w.join(); 91 104 } catch (InterruptedException e) { 92 System.err.println("join fail!");105 logger.error("interrupted while shutting down"); 93 106 } 94 107 } … … 102 115 } 103 116 117 104 118 @Override 105 119 void doCancelJob(HarvestJobImpl job) throws HarvesterException { 106 120 queue.remove(job); 107 121 } 122 108 123 109 124 @Override
Note: See TracChangeset
for help on using the changeset viewer.