Changeset 1137 for OAIHarvester


Ignore:
Timestamp:
02/22/11 09:50:37 (13 years ago)
Author:
oschonef
Message:
  • major re-factoring of Harvester, HarvestJob?, their implementations, etc.
  • initial version for parallel harvester
Location:
OAIHarvester/trunk/OAIHarvester/src
Files:
1 added
9 edited

Legend:

Unmodified
Added
Removed
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/HarvestHandler.java

    r1128 r1137  
    8484    public void onRecordAbout(Header header, XMLStreamReader reader);
    8585
    86     /* XXX: move to different interface? */
     86    /**
     87     * Optionally warp input stream.
     88     *
     89     * @param stream
     90     * @return
     91     * @throws IOException
     92     */
    8793    public InputStream wrap(InputStream stream) throws IOException;
    88    
     94
    8995} // interface HarvestHandler
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/HarvestJob.java

    r1128 r1137  
    1313    };
    1414
    15     public String getId();
     15    public long getId();
    1616
    1717    public State getState();
     18
     19    public boolean isRunning();
    1820
    1921    public long getRecordCount();
     
    3436
    3537    public HarvestHandler getHarvestHandler();
    36 
    37     public void setHarvestHandler(HarvestHandler handler);
    3838
    3939    public Date getFrom();
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/Harvester.java

    r1128 r1137  
    2323            throws HarvesterException;
    2424
     25    public void shutdown();
     26
    2527} // interface Harvester
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/AbstractHarvester.java

    r1128 r1137  
    2020    private final XMLInputFactory factory;
    2121//    private final XMLValidationSchema schema;
    22    
    23    
     22
    2423    protected AbstractHarvester() {
    2524        factory = XMLInputFactory2.newInstance();
     
    8887    abstract void doCancelJob(HarvestJobImpl job) throws HarvesterException;
    8988
    90     XMLStreamReader createReader(InputStream in) throws XMLStreamException {
     89    protected abstract HarvestJob doFindJob(String id)
     90        throws HarvesterException;
     91
     92    protected final XMLStreamReader createReader(InputStream in)
     93            throws XMLStreamException {
    9194        XMLStreamReader2 reader =
    9295            (XMLStreamReader2) factory.createXMLStreamReader(in);
     
    9598    }
    9699
    97     protected abstract HarvestJob doFindJob(String id)
    98         throws HarvesterException;
    99 
    100100} // abstract class AbstractHarvester
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestJobImpl.java

    r1128 r1137  
    88import java.util.Date;
    99import java.util.List;
    10 import java.util.concurrent.atomic.AtomicReference;
    11 
    12 import javax.xml.stream.XMLStreamException;
     10import java.util.concurrent.Delayed;
     11import java.util.concurrent.TimeUnit;
     12
    1313import javax.xml.stream.XMLStreamReader;
    1414
     
    2222import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
    2323
    24 final class HarvestJobImpl implements HarvestJob {
    25     private enum Task {
     24final class HarvestJobImpl implements HarvestJob, Delayed {
     25    public enum Task {
    2626        IDENTIFY_REPOSITORY,
    2727        ENUMERATE_METADATA_FORMATS,
    28         INITALIZE_HARVEST_RECORDS,
    2928        HARVEST_RECORDS
    3029    }
    3130    private final AbstractHarvester harvester;
    32     private final String id;
     31    private final long id;
    3332    private final URI repositoryURI;
     33    private final HarvestHandler handler;
    3434    private State state = State.NEW;
    35     private AtomicReference<HarvestHandler> handler;
    3635    private Date from;
    3736    private Date until;
     
    5352    private long timeTotal = 0;
    5453    private long timeNetwork = 0;
    55     private long timeWait = 0;
     54    private long timeDelayed = 0;
    5655    private long timeProcessing = 0;
    57     private long startTime;
     56    private long startTimestamp;
    5857    private Task task = Task.IDENTIFY_REPOSITORY;
    59     private List<String> availablePrefixes;
    60     private String resumptionToken;
    61     private Date latestDatestamp;
    62 
    63     HarvestJobImpl(AbstractHarvester harvester, String id, URI repositoryURI,
     58    private long delayUntil = 0;
     59    private int networkTry = 0;
     60    private List<String> prefixWorklist = null;
     61    private String resumptionToken = null;
     62    private Date latestDatestamp = null;
     63
     64    HarvestJobImpl(AbstractHarvester harvester, long id, URI repositoryURI,
    6465            HarvestHandler handler) {
    6566        this.harvester = harvester;
    6667        this.id = id;
    6768        this.repositoryURI = repositoryURI;
    68         this.handler = new AtomicReference<HarvestHandler>(handler);
    69     }
    70 
    71     @Override
    72     public String getId() {
     69        this.handler = handler;
     70    }
     71
     72    @Override
     73    public long getId() {
    7374        return id;
    7475    }
     
    8081
    8182    @Override
     83    public boolean isRunning() {
     84        return state == HarvestJob.State.RUNNING;
     85    }
     86
     87    @Override
    8288    public long getRecordCount() {
    8389        return recordCount;
     
    111117    @Override
    112118    public long getWaitTime() {
    113         return timeWait;
     119        return timeDelayed;
    114120    }
    115121
     
    122128    @Override
    123129    public HarvestHandler getHarvestHandler() {
    124         return handler.get();
    125     }
    126    
    127     @Override
    128     public void setHarvestHandler(HarvestHandler handler) {
    129         this.handler.set(handler);
     130        return handler;
    130131    }
    131132
     
    197198    @Override
    198199    public void cancel() throws HarvesterException {
    199         state = State.CANCELED;
     200        setState(State.CANCELED);
    200201        harvester.doCancelJob(this);
     202    }
     203
     204    @Override
     205    public long getDelay(TimeUnit unit) {
     206        return unit.convert((delayUntil - System.currentTimeMillis()),
     207                TimeUnit.MILLISECONDS);
     208    }
     209
     210    @Override
     211    public int compareTo(Delayed o) {
     212        final long me = getDelay(TimeUnit.MICROSECONDS);
     213        final long they = o.getDelay(TimeUnit.MILLISECONDS);
     214        if (me > they) {
     215            return 1;
     216        } else if (me < they) {
     217            return -1;
     218        } else {
     219            return 0;
     220        }
     221    }
     222
     223    @Override
     224    public String toString() {
     225        StringBuilder sb = new StringBuilder();
     226        sb.append(getClass().getCanonicalName());
     227        sb.append("[id=");
     228        sb.append(id);
     229        sb.append(", state=");
     230        sb.append(state);
     231        sb.append(", task=");
     232        sb.append(task);
     233        sb.append("]");
     234        return sb.toString();
    201235    }
    202236
     
    206240
    207241    void setState(State state) {
     242        if ((this.state != State.RUNNING) && (state == State.RUNNING)) {
     243            startTimestamp = System.currentTimeMillis();
     244        }
     245        if ((this.state == State.RUNNING) && (state != State.RUNNING)) {
     246            timeTotal = System.currentTimeMillis() - startTimestamp;
     247        }
    208248        this.state = state;
    209249    }
     
    257297    }
    258298
    259     void addToBytesTransferred(long bytes) {
    260         bytedTransferred += bytes;
    261     }
    262 
    263299    void addToNetworkTime(long delta) {
    264300        timeNetwork += delta;
    265301    }
    266302
    267     void addToWaitTime(long delta) {
    268         timeWait += delta;
    269     }
    270 
    271 
    272     void addToProcessingTime(long delta) {
    273        timeProcessing += delta;
     303    void finishRequest(long bytesTransferred, long timeProcessingDelta) {
     304        timeProcessing += timeProcessingDelta;
     305        bytedTransferred += bytesTransferred;
     306    }
     307
     308    boolean isState(State state) {
     309        return this.state == state;
     310    }
     311
     312    Task getTask() {
     313        return task;
     314    }
     315
     316    void setTask(Task task) {
     317        this.task = task;
     318    }
     319
     320    String getCurrentPrefix() {
     321        if (prefixWorklist == null) {
     322            prefixWorklist = new ArrayList<String>();
     323            if (metadataPrefixes != null) {
     324                prefixWorklist.addAll(metadataPrefixes);
     325            } else if (metadataFormats != null) {
     326                for (MetadataFormat format : metadataFormats) {
     327                    prefixWorklist.add(format.getPrefix());
     328                }
     329            } else {
     330                prefixWorklist.add("oai_dc");
     331            }
     332        }
     333        return prefixWorklist.isEmpty() ? null : prefixWorklist.get(0);
     334    }
     335
     336    boolean removeCurrentPrefix() {
     337        prefixWorklist.remove(0);
     338        return !prefixWorklist.isEmpty();
    274339    }
    275340
     
    282347    }
    283348
    284     String getCurrentPrefix() {
    285         return availablePrefixes.get(0);
    286     }
    287 
    288     boolean process(HarvestWorker worker) throws HarvesterException {
    289         if (state == State.CANCELED) {
    290             return true;
    291         }
    292         if (state != State.RUNNING) {
    293             throw new HarvesterException("job is not in running state");
    294         }
    295         try {
    296             switch (task) {
    297             case IDENTIFY_REPOSITORY:
    298                 startTime = System.currentTimeMillis();
    299                 worker.doIdentify(this);
    300                 task = Task.ENUMERATE_METADATA_FORMATS;
    301                 break;
    302             case ENUMERATE_METADATA_FORMATS:
    303                 worker.doListMetadataFormats(this);
    304                 task = Task.INITALIZE_HARVEST_RECORDS;
    305                 break;
    306             case INITALIZE_HARVEST_RECORDS:
    307                 if (availablePrefixes == null) {
    308                     availablePrefixes = new ArrayList<String>();
    309                     if (metadataPrefixes != null) {
    310                         availablePrefixes.addAll(metadataPrefixes);
    311                     } else if (metadataFormats != null) {
    312                         for (MetadataFormat format : metadataFormats) {
    313                             availablePrefixes.add(format.getPrefix());
    314                         }
    315                     } else {
    316                         availablePrefixes.add("oai_dc");
    317                     }
    318                 }
    319                 task = Task.HARVEST_RECORDS;
    320                 resumptionToken = null;
    321                 latestDatestamp = null;
    322                 onStartListingRecords(getCurrentPrefix());
    323                 /* FALL-TROUGH */
    324             case HARVEST_RECORDS:
    325                 boolean more = worker.doListRecords(this);
    326                 if (!more) {
    327                     onFinishListingRecords(getCurrentPrefix());
    328                     availablePrefixes.remove(0);
    329                     if (availablePrefixes.isEmpty()) {
    330                         timeTotal = System.currentTimeMillis() - startTime;
    331                         state = State.DONE;
    332                         return true;
    333                     } else {
    334                         task = Task.INITALIZE_HARVEST_RECORDS;
    335                     }
    336                 }
    337             }
    338         } catch (Throwable e) {
    339             timeTotal = System.currentTimeMillis() - startTime;
    340             state = State.ERROR;
    341             if (e instanceof HarvesterException) {
    342                 throw (HarvesterException) e;
    343             } else if (e instanceof XMLStreamException) {
    344                 throw new HarvesterException("malformed OAI response", e);
    345             } else if (e instanceof IOException) {
    346                 throw new HarvesterException("network error", e);
    347             } else {
    348                 throw new HarvesterException("unexpected exception", e);
    349             }
    350         }
    351         return false;
    352     }
    353 
    354     void onStartListingRecords(String prefix) {
    355         final HarvestHandler h = handler.get();
    356         if (h != null) {
    357             h.onStartListingRecords(prefix);
    358         }
    359     }
    360 
    361     void onFinishListingRecords(String prefix) {
    362         final HarvestHandler h = handler.get();
    363         if (h != null) {
    364             h.onFinishListingRecords(prefix, latestDatestamp);
    365         }
     349    boolean isHarvestingNewPrefix() {
     350        return (resumptionToken == null) && (networkTry == 0);
     351    }
     352
     353    int incNetworkRequestCount() {
     354        return ++networkTry;
     355    }
     356   
     357    int getNetworkRequestCount() {
     358        return networkTry;
     359    }
     360
     361    void resetNetworkRequestCount() {
     362        delayUntil = 0;
     363        networkTry = 0;
     364    }
     365
     366    void setNetworkRequestDelay(long delay) {
     367        this.timeDelayed += delay;
     368        this.delayUntil = System.currentTimeMillis() + delay;
     369    }
     370
     371    void onStartListingRecords() {
     372        if (handler != null) {
     373            handler.onStartListingRecords(getCurrentPrefix());
     374        }
     375    }
     376
     377    void onFinishListingRecords() {
     378        if (handler != null) {
     379            handler.onFinishListingRecords(getCurrentPrefix(), latestDatestamp);
     380        }
     381        // reset ...
     382        resumptionToken = null;
     383        latestDatestamp = null;
    366384    }
    367385
     
    375393            }
    376394        }
    377         final HarvestHandler h = handler.get();
    378         if (h != null) {
    379             h.onRecord(header);
     395        if (handler != null) {
     396            handler.onRecord(header);
    380397        }
    381398    }
    382399
    383400    void onRecordMetadata(Header header, XMLStreamReader reader) {
    384         final HarvestHandler h = handler.get();
    385         if (h != null) {
    386             h.onRecordMetadata(header, reader);
     401        if (handler != null) {
     402            handler.onRecordMetadata(header, reader);
    387403        }
    388404    }
    389405
    390406    void onRecordAbout(Header header, XMLStreamReader reader) {
    391         final HarvestHandler h = handler.get();
    392         if (h != null) {
    393             h.onRecordAbout(header, reader);
     407        if (handler != null) {
     408            handler.onRecordAbout(header, reader);
    394409        }
    395410    }
    396411
    397412    void onIdentify() {
    398         final HarvestHandler h = handler.get();
    399         if (h != null) {
     413        if (handler != null) {
    400414            final Repository repository = new Repository() {
    401415                @Override
     
    439453                }
    440454            };
    441             h.onIdentify(repository);
     455            handler.onIdentify(repository);
    442456        }
    443457    }
     
    445459    void onListMetadataFormats(List<MetadataFormat> metadataFormats) {
    446460        this.metadataFormats = metadataFormats;
    447         final HarvestHandler h = handler.get();
    448         if (h != null) {
    449             h.onListMetadataFormats(this.metadataFormats);
     461        if (handler != null) {
     462            handler.onListMetadataFormats(this.metadataFormats);
    450463        }
    451464    }
    452465
    453466    InputStream wrap(InputStream stream) throws IOException {
    454         final HarvestHandler h = handler.get();
    455         if (h != null) {
    456             return h.wrap(stream);
     467        if (handler != null) {
     468            return handler.wrap(stream);
    457469        } else {
    458470            return stream;
     
    460472    }
    461473
    462 } // class HarvestJob
     474} // class HarvestJobImpl
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java

    r1128 r1137  
    2323import org.joda.time.format.DateTimeFormatter;
    2424
     25import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
    2526import eu.clarin.cmdi.oai.harvester.HarvesterException;
    2627import eu.clarin.cmdi.oai.harvester.MetadataFormat;
     
    2829import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
    2930import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
     31import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
    3032
    3133public class HarvestWorker {
     
    5658    private final HeaderImpl header;
    5759    private final XmlStreamReaderProxy readerWrapper;
    58     private int retries = 5;
     60    private long delay;
    5961
    6062    HarvestWorker(AbstractHarvester harvester) {
     
    7173    }
    7274
    73     void doIdentify(HarvestJobImpl job) throws IOException, XMLStreamException,
    74             HarvesterException {
     75    long process(HarvestJobImpl job, int maxNetworkTries)
     76            throws HarvesterException {
     77        if (job.isState(State.CANCELED)) {
     78            return -1;
     79        }
     80        if (!job.isState(State.RUNNING)) {
     81            throw new HarvesterException("job is not in running state");
     82        }
     83
     84        boolean success;
     85        try {
     86            switch (job.getTask()) {
     87            case IDENTIFY_REPOSITORY:
     88                success = doIdentifyTask(job);
     89                if (success) {
     90                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
     91                }
     92                break;
     93            case ENUMERATE_METADATA_FORMATS:
     94                success = doListMetadataFormatsTask(job);
     95                if (success) {
     96                    job.setTask(Task.HARVEST_RECORDS);
     97                }
     98                break;
     99            case HARVEST_RECORDS:
     100                // check for new prefix and notify start, if needed
     101                if (job.isHarvestingNewPrefix()) {
     102                    job.onStartListingRecords();
     103                }
     104                success = doListRecordsTask(job);
     105                if (success) {
     106                    // notify end of prefix
     107                    job.onFinishListingRecords();
     108                    if (!job.removeCurrentPrefix()) {
     109                        job.setState(State.DONE);
     110                    }
     111                }
     112                break;
     113            default:
     114                throw new HarvesterException("internal error");
     115            } // switch
     116
     117            if (success) {
     118                job.resetNetworkRequestCount();
     119            } else {
     120                if (delay > 0) {
     121                    job.setNetworkRequestDelay(delay);
     122                }
     123                if (job.incNetworkRequestCount() >= maxNetworkTries) {
     124                    throw new HarvesterException("too many network retries: " +
     125                            job.getNetworkRequestCount());
     126                }
     127            }
     128            return delay;
     129        } catch (Throwable e) {
     130            job.setState(State.ERROR);
     131            if (e instanceof HarvesterException) {
     132                throw (HarvesterException) e;
     133            } else if (e instanceof XMLStreamException) {
     134                throw new HarvesterException("malformed OAI response", e);
     135            } else if (e instanceof IOException) {
     136                throw new HarvesterException("network error", e);
     137            } else {
     138                throw new HarvesterException("unexpected exception", e);
     139            }
     140        }
     141    }
     142
     143    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
     144            XMLStreamException, HarvesterException {
    75145        Response response = null;
    76146        try {
    77147            uriBuilder.setVerb(VERB_IDENTIFY);
    78148            response = execute(job, false);
     149            if (response == null) {
     150                return false;
     151            }
    79152
    80153            response.readStart(OAI_NS, "Identify", true);
     
    154227           
    155228            job.onIdentify();
     229
     230            return true;
    156231        } finally {
    157232            if (response != null) {
    158                 response.release();
    159             }
    160         }
    161     }
    162 
    163     void doListMetadataFormats(HarvestJobImpl job) throws IOException,
    164             XMLStreamException, HarvesterException {
     233                response.release(job);
     234            }
     235        }
     236    }
     237
     238    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
     239            throws IOException, XMLStreamException, HarvesterException {
    165240        Response response = null;
    166241        try {
    167242            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
    168243            response = execute(job, true);
     244            if (response == null) {
     245                return false;
     246            }
    169247
    170248            response.readStart(OAI_NS, "ListMetadataFormats", true);
     
    187265
    188266            job.onListMetadataFormats(metadataFormats);
     267           
     268            return true;
    189269        } finally {
    190270            if (response != null) {
    191                 response.release();
    192             }
    193         }
    194     }
    195 
    196     boolean doListRecords(HarvestJobImpl job) throws IOException,
     271                response.release(job);
     272            }
     273        }
     274    }
     275
     276    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
    197277            XMLStreamException, HarvesterException {
    198278        Response response = null;
     
    220300
    221301            response = execute(job, true);
     302            if (response == null) {
     303                return false;
     304            }
     305
    222306            String nextToken = null;
    223307
     
    232316                        header.setDeleted(true);
    233317                    } else {
    234                         throw new XMLStreamException("attribute 'status of' " +
     318                        throw new XMLStreamException("attribute 'status' of " +
    235319                                "element 'record' must contain 'deleted'");
    236320                    }
     
    283367            response.close();
    284368
    285             return (nextToken != null);
     369            return (nextToken == null);
    286370        } finally {
    287371            if (response != null) {
    288                 response.release();
     372                response.release(job);
    289373            }
    290374        }
     
    294378            throws IOException, XMLStreamException, HarvesterException {
    295379        long now = System.currentTimeMillis();
    296         do {
    297             job.incRequestCount();
    298             final URI uri = uriBuilder.createURI(job.getRepositoryURI());
    299             System.err.println("HTTP GET " + uri);
    300             final HttpGet request = new HttpGet(uri);
    301             if (compress) {
    302                 if ((job.getCompressionMask() &
    303                         Repository.COMPRESSION_METHOD_GZIP) > 0) {
    304                     request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
    305                 }
    306                 if ((job.getCompressionMask() &
    307                         Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
    308                     request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
    309                 }
    310                 request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
    311             }
    312             request.addHeader(HEADER_CONNECTION, "close");
    313             // FIXME: get timeout values from somewhere?
    314             HttpParams params = request.getParams();
    315             params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
    316             params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
    317 
    318             final HttpResponse response = client.execute(request);
    319             long then = System.currentTimeMillis();
    320             job.addToNetworkTime(then - now);
    321             int status = response.getStatusLine().getStatusCode();
    322             if (status == HttpStatus.SC_OK) {
    323                 return new Response(job, response.getEntity(), harvester);
    324             } else {
    325                 request.abort();
    326                 if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
    327                     if (retries > 0) {
    328                         System.err.println("UNAVAILABLE, wait & retry, " +
    329                                 (retries - 1) + " left ...");
    330                         try {
    331                             final long sleep = parseTryAfter(response);
    332                             System.err.println("-> sleep " + sleep);
    333                             Thread.sleep(sleep);
    334                             now = System.currentTimeMillis();
    335                             job.addToWaitTime(now - then);
    336                         } catch (InterruptedException e) {
    337                             /* IGNORE */
    338                         }
    339                         continue;
    340                     } else {
    341                         throw new HarvesterException("too many retries");
    342                     }
    343                 } else {
    344                     throw new HarvesterException(
    345                             "provider returned unexpected HTTP status: " +
    346                                     status);
    347                 }
    348             }
    349         } while (--retries > 0);
    350         throw new HarvesterException("to many retries");
     380        job.incRequestCount();
     381        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
     382        System.err.println("HTTP GET " + uri);
     383        final HttpGet request = new HttpGet(uri);
     384        if (compress) {
     385            if ((job.getCompressionMask() &
     386                    Repository.COMPRESSION_METHOD_GZIP) > 0) {
     387                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
     388            }
     389            if ((job.getCompressionMask() &
     390                    Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
     391                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
     392            }
     393            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
     394        }
     395        request.addHeader(HEADER_CONNECTION, "close");
     396        // FIXME: get timeout values from somewhere?
     397        HttpParams params = request.getParams();
     398        params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
     399        params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
     400
     401        final HttpResponse response = client.execute(request);
     402        job.addToNetworkTime(System.currentTimeMillis() - now);
     403        int status = response.getStatusLine().getStatusCode();
     404        if (status == HttpStatus.SC_OK) {
     405            delay = 0;
     406            return new Response(job, response.getEntity(), harvester);
     407        } else {
     408            request.abort();
     409            if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
     410                System.err.println("-> " + status);
     411                delay = parseTryAfter(response);
     412            } else {
     413                throw new HarvesterException(
     414                        "provider returned unexpected HTTP status: " + status);
     415            }
     416        }
     417        return null;
    351418    }
    352419
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/Response.java

    r1134 r1137  
    6666    private static final String OAI_NS =
    6767        "http://www.openarchives.org/OAI/2.0/";
    68     private final HarvestJobImpl job;
    6968    private final HttpEntity entity;
    7069    private final long now = System.currentTimeMillis();
     
    7473    Response(HarvestJobImpl job, HttpEntity entity, AbstractHarvester harvester)
    7574            throws IOException, XMLStreamException, HarvesterException {
    76         this.job = job;
    7775        this.entity = entity;
    7876
     
    122120    }
    123121
    124     public void release() {
    125         job.addToProcessingTime(System.currentTimeMillis() - now);
    126         job.addToBytesTransferred(stream.getByteCount());
     122    public void release(HarvestJobImpl job) {
     123        job.finishRequest(stream.getByteCount(),
     124                (System.currentTimeMillis() - now));
    127125        try {
    128126            reader.close();
  • OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/SimpleHarvester.java

    r1128 r1137  
    22
    33import java.net.URI;
     4import java.util.concurrent.TimeUnit;
    45
    56import eu.clarin.cmdi.oai.harvester.HarvestHandler;
     
    2122            throw new IllegalArgumentException("repositoryURI == null");
    2223        }
    23         String jobId;
     24        long jobId;
    2425        synchronized (this) {
    25             jobId = "0x" + Long.toHexString(++nextId);
     26            jobId = ++nextId;
    2627        } // synchronized
    2728        return new HarvestJobImpl(this, jobId, repositoryURI, handler);
     
    3031    public static Harvester newInstance() {
    3132        return new SimpleHarvester();
     33    }
     34
     35    @Override
     36    public void shutdown() {
    3237    }
    3338
     
    4045
    4146        job.setState(HarvestJob.State.RUNNING);
    42         boolean done = false;
    43         while (!done) {
    44             done = job.process(worker);
     47
     48        while (job.isRunning()) {
     49            worker.process(job, 15);
     50            long delay = -1;
     51            while (job.isRunning() &&
     52                    ((delay = job.getDelay(TimeUnit.MILLISECONDS)) > 0)) {
     53                try {
     54                    System.err.println(job.getId() + ": (" +
     55                            job.getNetworkRequestCount() + ") waiting " +
     56                            delay + " ...");
     57                    Thread.sleep(delay);
     58                } catch (InterruptedException e) {
     59                }
     60                delay = job.getDelay(TimeUnit.MILLISECONDS);
     61            } // while (wait loop)
    4562        } // while
    4663    }
  • OAIHarvester/trunk/OAIHarvester/src/test/java/eu/clarin/cmdi/oai/harvester/HarvesterTest.java

    r1134 r1137  
    1818import javax.xml.stream.XMLStreamWriter;
    1919
    20 import eu.clarin.cmdi.oai.harvester.HarvestHandlerAdapter;
    2120import eu.clarin.cmdi.oai.harvester.impl.SimpleHarvester;
    2221import eu.clarin.cmdi.oai.harvester.util.XMLStreamCopier;
     
    3130        private int reqNum;
    3231        private String prefix;
     32        private byte[] buffer = new byte[8192];
    3333
    3434        public MyHarvestHandler(File file) {
     
    161161            ZipEntry entry = new ZipEntry(filename);
    162162            output.putNextEntry(entry);
    163             byte[] buffer = new byte[8192];
    164163            int r = -1;
    165164            do {
     
    188187        final String repos = args[0];
    189188        MyHarvestHandler handler = new MyHarvestHandler(new File(args[1]));
     189
     190        Harvester harvester = SimpleHarvester.newInstance();
    190191        try {
    191             Harvester harvester = SimpleHarvester.newInstance();
    192192            HarvestJob job = harvester.createJob(repos, handler);
    193193//            Calendar cal = Calendar.getInstance(TimeZone.getDefault());
     
    197197            job.setMetadataPrefixes(Arrays.asList("oai_dc"));
    198198            job.run();
     199
     200            while (job.isRunning()) {
     201                try {
     202                    Thread.sleep(1500);
     203                } catch (InterruptedException e) {
     204                }
     205            }
    199206
    200207            System.err.println("==> " + job.getState() + ": "+
     
    219226        } finally {
    220227            handler.close();
     228            harvester.shutdown();
    221229        }
    222230    }
Note: See TracChangeset for help on using the changeset viewer.