Ignore:
Timestamp:
06/10/13 19:50:23 (11 years ago)
Author:
oschonef
Message:
  • update Woodstox, SLF4J and Apache Commons HTTP client dependencies
  • several interoperability fixes (including relaxed behavior)
  • several bug fixes
File:
1 edited

Legend:

Unmodified
Added
Removed
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/ParallelHarvester.java

    r1148 r3011  
    88import java.util.concurrent.atomic.AtomicLong;
    99
     10import org.slf4j.Logger;
     11import org.slf4j.LoggerFactory;
     12
    1013import eu.clarin.cmdi.oai.harvester.HarvestHandler;
    1114import eu.clarin.cmdi.oai.harvester.HarvestJob;
     
    1316import eu.clarin.cmdi.oai.harvester.HarvesterException;
    1417
     18
    1519public class ParallelHarvester extends AbstractHarvester {
    1620    private final class WorkerThread extends Thread {
    17         private final HarvestWorker worker =
    18             new HarvestWorker(ParallelHarvester.this);
     21        private final HarvestWorker worker = new HarvestWorker(
     22                ParallelHarvester.this);
    1923        private AtomicBoolean loop = new AtomicBoolean(true);
    20        
     24
     25
    2126        @Override
    2227        public void run() {
    2328            while (loop.get()) {
    2429                try {
    25                     System.err.println(getId() + " waiting ...");
    26                     HarvestJobImpl job = queue.take();
    27                     System.err.println(getId() + " got job " + job.getId());
     30                    logger.debug("[{}] waiting ...", getId());
     31                    final HarvestJobImpl job = queue.take();
     32                    logger.debug("[{}] got job {}", getId(), job.getId());
    2833                    try {
    2934                        while (true) {
    30                             System.err.println(getId() + " process job " +
    31                                     job.getId());
     35                            logger.debug("[{}] processing job {} ...",
     36                                    getId(), job.getId());
    3237                            worker.process(job, maxNetworkRetryCount);
    3338                            if (job.isRunning()) {
    3439                                if (job.getDelay(TimeUnit.MILLISECONDS) > 0) {
    3540                                    queue.offer(job);
    36                                     System.err.println(getId() +
    37                                             " requeuing job " + job.getId());
     41                                    logger.debug("[{}] requeuing job {} ...",
     42                                            getId(), job.getId());
    3843                                } else {
    3944                                    continue;
     
    4954                }
    5055            } // while
    51             System.err.println(getId() + " ... exit");
     56            logger.debug("[{}] exiting ...", getId());
    5257        }
     58
    5359
    5460        void shutdown() {
     
    5763        }
    5864    }
     65    private static final Logger logger =
     66            LoggerFactory.getLogger(ParallelHarvester.class);
     67    private final HashSet<WorkerThread> workers =
     68            new HashSet<WorkerThread>();
     69    private final DelayQueue<HarvestJobImpl> queue =
     70            new DelayQueue<HarvestJobImpl>();
     71    private final AtomicLong nextId = new AtomicLong(0);
    5972
    60     private final HashSet<WorkerThread> workers = new HashSet<WorkerThread>();
    61     private final DelayQueue<HarvestJobImpl> queue = new DelayQueue<HarvestJobImpl>();
    62     private AtomicLong nextId = new AtomicLong();
    6373
    6474    private ParallelHarvester() {
     
    6878    }
    6979
     80
    7081    @Override
    7182    public HarvestJob createJob(URI repositoryURI, HarvestHandler handler)
     
    7485                repositoryURI, handler);
    7586    }
     87
    7688
    7789    public static Harvester newInstance() {
     
    8395    }
    8496
     97
    8598    @Override
    8699    public void shutdown() {
     
    90103                w.join();
    91104            } catch (InterruptedException e) {
    92                 System.err.println("join fail!");
     105                logger.error("interrupted while shutting down");
    93106            }
    94107        }
     
    102115    }
    103116
     117
    104118    @Override
    105119    void doCancelJob(HarvestJobImpl job) throws HarvesterException {
    106120        queue.remove(job);
    107121    }
     122
    108123
    109124    @Override
Note: See TracChangeset for help on using the changeset viewer.