source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestJobImpl.java @ 1163

Last change on this file since 1163 was 1163, checked in by oschonef, 13 years ago
  • convert ealiestDatestamp to Date in Identify
  • add pluggable parser architecture for additional descriptions in Identify
  • add description parsers for OAI-identifier and OAI Dublin Core descriptions
  • Property svn:eol-style set to native
File size: 16.1 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.io.InputStream;
5import java.net.URI;
6import java.util.ArrayList;
7import java.util.Collections;
8import java.util.Date;
9import java.util.List;
10import java.util.concurrent.Delayed;
11import java.util.concurrent.TimeUnit;
12
13import javax.xml.stream.XMLStreamReader;
14
15import eu.clarin.cmdi.oai.harvester.Description;
16import eu.clarin.cmdi.oai.harvester.HarvestHandler;
17import eu.clarin.cmdi.oai.harvester.HarvestJob;
18import eu.clarin.cmdi.oai.harvester.HarvesterException;
19import eu.clarin.cmdi.oai.harvester.Header;
20import eu.clarin.cmdi.oai.harvester.MetadataFormat;
21import eu.clarin.cmdi.oai.harvester.Repository;
22import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
23import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
24
25final class HarvestJobImpl implements HarvestJob, Delayed {
26    public enum Task {
27        IDENTIFY_REPOSITORY,
28        ENUMERATE_METADATA_FORMATS,
29        HARVEST_RECORDS
30    }
31    private static final class StatisticsImpl implements Statistics {
32        private final String prefix;
33        private long recordCount = 0;
34        private int requestCount = 0;
35        private int resumptionCount = 0;
36        private long bytedTransferred = 0;
37        private Date latestDatestamp = null;
38       
39        private StatisticsImpl(String prefix) {
40            this.prefix = prefix;
41        }
42
43        @Override
44        public String getPrefix() {
45            return prefix;
46        }
47
48        @Override
49        public long getRecordCount() {
50            return recordCount;
51        }
52
53        private void incRecordCount() {
54            this.recordCount++;
55        }
56
57        @Override
58        public int getRequestCount() {
59            return requestCount;
60        }
61
62        private void incRequestCount() {
63            this.requestCount++;
64        }
65
66        @Override
67        public int getResumptionCount() {
68            return resumptionCount;
69        }
70
71        private void incResumptionCount() {
72            this.resumptionCount++;
73        }
74
75        @Override
76        public long getBytesTransferred() {
77            return bytedTransferred;
78        }
79   
80        private void incBytesTransferred(long bytes) {
81            this.bytedTransferred += bytes;
82        }
83
84        @Override
85        public Date getLatestDatestamp() {
86            return latestDatestamp;
87        }
88       
89        private void updateLatestDatestmp(Date latestDatestamp) {
90            if (this.latestDatestamp == null) {
91                this.latestDatestamp = latestDatestamp;
92            } else {
93                if (latestDatestamp.compareTo(this.latestDatestamp) > 0) {
94                    this.latestDatestamp = latestDatestamp;
95                }
96            }
97        }
98    } // inner class Statistics
99    private final AbstractHarvester harvester;
100    private final long id;
101    private final URI repositoryURI;
102    private final HarvestHandler handler;
103    private State state = State.NEW;
104    private Date from;
105    private Date until;
106    private String set;
107    private List<String> metadataPrefixes;
108    private String repositoryName;
109    private String baseURL;
110    private String protocolVersion;
111    private List<String> adminEmail = Collections.emptyList();
112    private Date earliestTimestamp;
113    private DeletedNotion deletedNotion;
114    private Granularity granularity;
115    private int compressionMask;
116    private List<Description> descriptions;
117    private List<MetadataFormat> metadataFormats;
118    private long totalRecordCount = 0;
119    private int totalRequestCount = 0;
120    private long totalBytesTransferred = 0;
121    private long timeTotal = 0;
122    private long timeNetwork = 0;
123    private long timeDelayed = 0;
124    private long timeProcessing = 0;
125    private long startTimestamp;
126    private List<Statistics> statistics = null;
127    private Task task = Task.IDENTIFY_REPOSITORY;
128    private long delayUntil = 0;
129    private int networkRetryCount = 0;
130    private List<String> prefixWorklist = null;
131    private String resumptionToken = null;
132
133    HarvestJobImpl(AbstractHarvester harvester, long id, URI repositoryURI,
134            HarvestHandler handler) {
135        this.harvester = harvester;
136        this.id = id;
137        this.repositoryURI = repositoryURI;
138        this.handler = handler;
139    }
140
141    @Override
142    public long getId() {
143        return id;
144    }
145
146    @Override
147    public State getState() {
148        return state;
149    }
150
151    @Override
152    public boolean isRunning() {
153        return state == HarvestJob.State.RUNNING;
154    }
155
156    @Override
157    public long getTotelRecordCount() {
158        return totalRecordCount;
159    }
160
161    @Override
162    public int getTotelRequestCount() {
163        return totalRequestCount;
164    }
165
166    @Override
167    public long getTotalBytesTransferred() {
168        return totalBytesTransferred;
169    }
170
171    @Override
172    public long getTotalTime() {
173        return timeTotal;
174    }
175
176    @Override
177    public long getNetworkTime() {
178        return timeNetwork;
179    }
180   
181    @Override
182    public long getWaitTime() {
183        return timeDelayed;
184    }
185
186    @Override
187    public long getProcessingTime() {
188        return timeProcessing;
189    }
190
191   
192    @Override
193    public HarvestHandler getHarvestHandler() {
194        return handler;
195    }
196
197    @Override
198    public Date getFrom() {
199        return from;
200    }
201   
202    @Override
203    public void setFrom(Date from) {
204        if (state != State.NEW) {
205            throw new IllegalStateException("state != new");
206        }
207        this.from = from;
208    }
209   
210    @Override
211    public Date getUntil() {
212        return until;
213    }
214   
215    @Override
216    public void setUntil(Date until) {
217        if (state != State.NEW) {
218            throw new IllegalStateException("state != new");
219        }
220        this.until = until;
221    }
222
223    @Override
224    public String getSet() {
225        return set;
226    }
227
228    @Override
229    public void setSet(String set) {
230        if (state != State.NEW) {
231            throw new IllegalStateException("state != new");
232        }
233        if ((set != null) && !set.isEmpty()) {
234            this.set = set;
235        } else {
236            this.set = null;
237        }
238    }
239
240    @Override
241    public List<String> getMetadataPrefixes() {
242        return metadataPrefixes;
243    }
244   
245    @Override
246    public void setMetadataPrefixes(List<String> metadataPrefixes) {
247        if (state != State.NEW) {
248            throw new IllegalStateException("state != new");
249        }
250        if ((metadataPrefixes != null) && !metadataPrefixes.isEmpty()) {
251            this.metadataPrefixes = metadataPrefixes;
252        } else {
253            this.metadataPrefixes = null;
254        }
255    }
256   
257    @Override
258    public Statistics getStatistics(String prefix) {
259        if (prefix == null) {
260            throw new NullPointerException("prefix == null");
261        }
262        if ((state == State.NEW) || (state == State.RUNNING)) {
263            throw new IllegalStateException("invalid state");
264        }
265        for (Statistics stats : statistics) {
266            if (prefix.equals(stats.getPrefix())) {
267                return stats;
268            }
269        }
270        return null;
271    }
272
273    @Override
274    public List<Statistics> getStatistics() {
275        if ((state == State.NEW) || (state == State.RUNNING)) {
276            throw new IllegalStateException("invalid state");
277        }
278        if ((statistics != null) && !statistics.isEmpty()) {
279            return Collections.unmodifiableList(statistics);
280        } else {
281            return Collections.emptyList();
282        }
283    }
284
285    @Override
286    public void run() throws HarvesterException {
287        harvester.doRunJob(this);
288    }
289
290    @Override
291    public void cancel() throws HarvesterException {
292        setState(State.CANCELED);
293        harvester.doCancelJob(this);
294    }
295
296    @Override
297    public long getDelay(TimeUnit unit) {
298        return unit.convert((delayUntil - System.currentTimeMillis()),
299                TimeUnit.MILLISECONDS);
300    }
301
302    @Override
303    public int compareTo(Delayed o) {
304        final long me = getDelay(TimeUnit.MICROSECONDS);
305        final long they = o.getDelay(TimeUnit.MILLISECONDS);
306        if (me > they) {
307            return 1;
308        } else if (me < they) {
309            return -1;
310        } else {
311            return 0;
312        }
313    }
314
315    @Override
316    public String toString() {
317        StringBuilder sb = new StringBuilder();
318        sb.append(getClass().getCanonicalName());
319        sb.append("[id=");
320        sb.append(id);
321        sb.append(", state=");
322        sb.append(state);
323        sb.append(", task=");
324        sb.append(task);
325        sb.append("]");
326        return sb.toString();
327    }
328
329    public URI getRepositoryURI() {
330        return repositoryURI;
331    }
332
333    void setState(State state) {
334        if ((this.state != State.RUNNING) && (state == State.RUNNING)) {
335            startTimestamp = System.currentTimeMillis();
336        }
337        if ((this.state == State.RUNNING) && (state != State.RUNNING)) {
338            timeTotal = System.currentTimeMillis() - startTimestamp;
339        }
340        this.state = state;
341    }
342
343    void setRepositoryName(String repositoryName) {
344        this.repositoryName = repositoryName;
345    }
346
347    void setBaseURL(String baseURL) {
348        this.baseURL = baseURL;
349    }
350
351    void setProtocolVersion(String protocolVersion) {
352        this.protocolVersion = protocolVersion;
353    }
354   
355    void setAdminEmail(List<String> adminEmail) {
356        this.adminEmail = adminEmail;
357    }
358
359    void setEarliestTimestamp(Date earliestTimestamp) {
360        this.earliestTimestamp = earliestTimestamp;
361    }
362
363    void setDeletedNotion(DeletedNotion deletedNotion) {
364        this.deletedNotion = deletedNotion;
365    }
366
367    void setGranularity(Granularity granularity) {
368        this.granularity = granularity;
369    }
370
371    Granularity getGranularity() {
372        return granularity;
373    }
374
375    void setCompressionMask(int compressionMask) {
376        this.compressionMask = compressionMask;
377    }
378   
379    int getCompressionMask() {
380        return compressionMask;
381    }
382
383    void setDescriptions(List<Description> descriptions) {
384        this.descriptions = descriptions;
385    }
386
387    void incRequestCount() {
388        totalRequestCount++;
389        final StatisticsImpl stats = getCurrentStatistics();
390        if (stats != null) {
391            stats.incRequestCount();
392        }
393    }
394
395    void incResumptionCount() {
396        final StatisticsImpl stats = getCurrentStatistics();
397        if (stats != null) {
398            stats.incResumptionCount();
399        }
400    }
401
402    void addToNetworkTime(long delta) {
403        timeNetwork += delta;
404    }
405
406    void finishRequest(long bytesTransferred, long timeProcessingDelta) {
407        timeProcessing += timeProcessingDelta;
408        totalBytesTransferred += bytesTransferred;
409        final StatisticsImpl stats = getCurrentStatistics();
410        if (stats != null) {
411            stats.incBytesTransferred(bytesTransferred);
412        }
413    }
414
415    boolean isState(State state) {
416        return this.state == state;
417    }
418
419    Task getTask() {
420        return task;
421    }
422
423    void setTask(Task task) {
424        this.task = task;
425    }
426
427    String getCurrentPrefix() {
428        if (prefixWorklist == null) {
429            prefixWorklist = new ArrayList<String>();
430            if (metadataPrefixes != null) {
431                prefixWorklist.addAll(metadataPrefixes);
432            } else if (metadataFormats != null) {
433                for (MetadataFormat format : metadataFormats) {
434                    prefixWorklist.add(format.getPrefix());
435                }
436            } else {
437                prefixWorklist.add("oai_dc");
438            }
439        }
440        return prefixWorklist.isEmpty() ? null : prefixWorklist.get(0);
441    }
442
443    boolean removeCurrentPrefix() {
444        prefixWorklist.remove(0);
445        return !prefixWorklist.isEmpty();
446    }
447
448    String getResumptionToken() {
449        return resumptionToken;
450    }
451   
452    void setResumptionToken(String resumptionToken) {
453        this.resumptionToken = resumptionToken;
454    }
455
456    boolean isHarvestingNewPrefix() {
457        return (resumptionToken == null) && (networkRetryCount == 0);
458    }
459
460    int incNetworkRetryCount() {
461        return ++networkRetryCount;
462    }
463   
464    int getNetworkRetryCount() {
465        return networkRetryCount;
466    }
467
468    void resetNetworkRetryCount() {
469        delayUntil = 0;
470        networkRetryCount = 0;
471    }
472
473    void setNetworkRequestDelay(long delay) {
474        this.timeDelayed += delay;
475        this.delayUntil = System.currentTimeMillis() + delay;
476    }
477
478    void onStartListingRecords() {
479        final String prefix = getCurrentPrefix();
480        if (statistics == null) {
481            statistics = new ArrayList<Statistics>();
482        }
483        statistics.add(0, new StatisticsImpl(prefix));
484
485        if (handler != null) {
486            handler.onStartListingRecords(prefix);
487        }
488    }
489
490    void onFinishListingRecords() {
491        Date latestDatestamp = null;
492
493        final StatisticsImpl stats = getCurrentStatistics();
494        if (stats != null) {
495            latestDatestamp = stats.getLatestDatestamp();
496        }
497        if (handler != null) {
498            handler.onFinishListingRecords(getCurrentPrefix(), latestDatestamp);
499        }
500        // reset ...
501        resumptionToken = null;
502    }
503
504    void onRecord(Header header) {
505        totalRecordCount++;
506        StatisticsImpl stats = getCurrentStatistics();
507        if (stats != null) {
508            stats.incRecordCount();
509            stats.updateLatestDatestmp(header.getDatestamp());
510        }
511        if (handler != null) {
512            handler.onRecord(header);
513        }
514    }
515
516    void onRecordMetadata(Header header, XMLStreamReader reader) {
517        if (handler != null) {
518            handler.onRecordMetadata(header, reader);
519        }
520    }
521
522    void onRecordAbout(Header header, XMLStreamReader reader) {
523        if (handler != null) {
524            handler.onRecordAbout(header, reader);
525        }
526    }
527
528    void onIdentify() {
529        if (handler != null) {
530            final Repository repository = new Repository() {
531                @Override
532                public String getRepositoryName() {
533                    return repositoryName;
534                }
535               
536                @Override
537                public String getProtocolVersion() {
538                    return protocolVersion;
539                }
540               
541                @Override
542                public Granularity getGranularity() {
543                    return granularity;
544                }
545               
546                @Override
547                public Date getEarliestTimestamp() {
548                    return earliestTimestamp;
549                }
550               
551                @Override
552                public DeletedNotion getDeletedNotion() {
553                    return deletedNotion;
554                }
555               
556                @Override
557                public int getCompressionMask() {
558                    return compressionMask;
559                }
560               
561                @Override
562                public String getBaseURL() {
563                    return baseURL;
564                }
565               
566                @Override
567                public List<String> getAdminEmail() {
568                    return (adminEmail != null) ?
569                            Collections.unmodifiableList(adminEmail) : null;
570                }
571
572                @Override
573                public List<Description> getDescriptions() {
574                    return (descriptions != null) ?
575                            Collections.unmodifiableList(descriptions) : null;
576                }
577            };
578            handler.onIdentify(repository);
579        }
580    }
581
582    void onListMetadataFormats(List<MetadataFormat> metadataFormats) {
583        this.metadataFormats = metadataFormats;
584        if (handler != null) {
585            handler.onListMetadataFormats(
586                    Collections.unmodifiableList(this.metadataFormats));
587        }
588    }
589
590    InputStream wrap(InputStream stream) throws IOException {
591        if (handler != null) {
592            return handler.wrap(stream);
593        } else {
594            return stream;
595        }
596    }
597
598    private StatisticsImpl getCurrentStatistics() {
599        if ((statistics != null) && !statistics.isEmpty()) {
600            return (StatisticsImpl) statistics.get(0);
601        }
602        return null;
603    }
604
605} // class HarvestJobImpl
Note: See TracBrowser for help on using the repository browser.