source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestJobImpl.java @ 1144

Last change on this file since 1144 was 1144, checked in by oschonef, 13 years ago
  • add per prefix statistics
  • Property svn:eol-style set to native
File size: 15.5 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.io.InputStream;
5import java.net.URI;
6import java.util.ArrayList;
7import java.util.Collections;
8import java.util.Date;
9import java.util.List;
10import java.util.concurrent.Delayed;
11import java.util.concurrent.TimeUnit;
12
13import javax.xml.stream.XMLStreamReader;
14
15import eu.clarin.cmdi.oai.harvester.HarvestHandler;
16import eu.clarin.cmdi.oai.harvester.HarvestJob;
17import eu.clarin.cmdi.oai.harvester.HarvesterException;
18import eu.clarin.cmdi.oai.harvester.Header;
19import eu.clarin.cmdi.oai.harvester.MetadataFormat;
20import eu.clarin.cmdi.oai.harvester.Repository;
21import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
22import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
23
24final class HarvestJobImpl implements HarvestJob, Delayed {
25    public enum Task {
26        IDENTIFY_REPOSITORY,
27        ENUMERATE_METADATA_FORMATS,
28        HARVEST_RECORDS
29    }
30    private static final class StatisticsImpl implements Statistics {
31        private final String prefix;
32        private long recordCount = 0;
33        private long requestCount = 0;
34        private long resumptionCount = 0;
35        private long bytedTransferred = 0;
36        private Date latestDatestamp = null;
37       
38        private StatisticsImpl(String prefix) {
39            this.prefix = prefix;
40        }
41
42        @Override
43        public String getPrefix() {
44            return prefix;
45        }
46
47        @Override
48        public long getRecordCount() {
49            return recordCount;
50        }
51
52        private void incRecordCount() {
53            this.recordCount++;
54        }
55
56        @Override
57        public long getRequestCount() {
58            return requestCount;
59        }
60
61        private void incRequestCount() {
62            this.requestCount++;
63        }
64
65        @Override
66        public long getResumptionCount() {
67            return resumptionCount;
68        }
69
70        private void incResumptionCount() {
71            this.resumptionCount++;
72        }
73
74        @Override
75        public long getBytesTransferred() {
76            return bytedTransferred;
77        }
78   
79        private void incBytesTransferred(long bytes) {
80            this.bytedTransferred += bytes;
81        }
82
83        @Override
84        public Date getLatestDatestamp() {
85            return latestDatestamp;
86        }
87       
88        private void updateLatestDatestmp(Date latestDatestamp) {
89            if (this.latestDatestamp == null) {
90                this.latestDatestamp = latestDatestamp;
91            } else {
92                if (latestDatestamp.compareTo(this.latestDatestamp) > 0) {
93                    this.latestDatestamp = latestDatestamp;
94                }
95            }
96        }
97    } // inner class Statistics
98    private final AbstractHarvester harvester;
99    private final long id;
100    private final URI repositoryURI;
101    private final HarvestHandler handler;
102    private State state = State.NEW;
103    private Date from;
104    private Date until;
105    private String set;
106    private List<String> metadataPrefixes;
107    private String repositoryName;
108    private String baseURL;
109    private String protocolVersion;
110    private List<String> adminEmail = Collections.emptyList();
111    private String earliestTimestamp;
112    private DeletedNotion deletedNotion;
113    private Granularity granularity;
114    private int compressionMask;
115    private List<MetadataFormat> metadataFormats;
116    private long totalRecordCount = 0;
117    private long totalRequestCount = 0;
118    private long totalBytesTransferred = 0;
119    private long timeTotal = 0;
120    private long timeNetwork = 0;
121    private long timeDelayed = 0;
122    private long timeProcessing = 0;
123    private long startTimestamp;
124    private List<Statistics> statistics = null;
125    private Task task = Task.IDENTIFY_REPOSITORY;
126    private long delayUntil = 0;
127    private int networkRetryCount = 0;
128    private List<String> prefixWorklist = null;
129    private String resumptionToken = null;
130
131    HarvestJobImpl(AbstractHarvester harvester, long id, URI repositoryURI,
132            HarvestHandler handler) {
133        this.harvester = harvester;
134        this.id = id;
135        this.repositoryURI = repositoryURI;
136        this.handler = handler;
137    }
138
139    @Override
140    public long getId() {
141        return id;
142    }
143
144    @Override
145    public State getState() {
146        return state;
147    }
148
149    @Override
150    public boolean isRunning() {
151        return state == HarvestJob.State.RUNNING;
152    }
153
154    @Override
155    public long getTotelRecordCount() {
156        return totalRecordCount;
157    }
158
159    @Override
160    public long getTotelRequestCount() {
161        return totalRequestCount;
162    }
163
164    @Override
165    public long getTotalBytesTransferred() {
166        return totalBytesTransferred;
167    }
168
169    @Override
170    public long getTotalTime() {
171        return timeTotal;
172    }
173
174    @Override
175    public long getNetworkTime() {
176        return timeNetwork;
177    }
178   
179    @Override
180    public long getWaitTime() {
181        return timeDelayed;
182    }
183
184    @Override
185    public long getProcessingTime() {
186        return timeProcessing;
187    }
188
189   
190    @Override
191    public HarvestHandler getHarvestHandler() {
192        return handler;
193    }
194
195    @Override
196    public Date getFrom() {
197        return from;
198    }
199   
200    @Override
201    public void setFrom(Date from) {
202        if (state != State.NEW) {
203            throw new IllegalStateException("state != new");
204        }
205        this.from = from;
206    }
207   
208    @Override
209    public Date getUntil() {
210        return until;
211    }
212   
213    @Override
214    public void setUntil(Date until) {
215        if (state != State.NEW) {
216            throw new IllegalStateException("state != new");
217        }
218        this.until = until;
219    }
220
221    @Override
222    public String getSet() {
223        return set;
224    }
225
226    @Override
227    public void setSet(String set) {
228        if (state != State.NEW) {
229            throw new IllegalStateException("state != new");
230        }
231        if ((set != null) && !set.isEmpty()) {
232            this.set = set;
233        } else {
234            this.set = null;
235        }
236    }
237
238    @Override
239    public List<String> getMetadataPrefixes() {
240        return metadataPrefixes;
241    }
242   
243    @Override
244    public void setMetadataPrefixes(List<String> metadataPrefixes) {
245        if (state != State.NEW) {
246            throw new IllegalStateException("state != new");
247        }
248        if ((metadataPrefixes != null) && !metadataPrefixes.isEmpty()) {
249            this.metadataPrefixes = metadataPrefixes;
250        } else {
251            this.metadataPrefixes = null;
252        }
253    }
254   
255    @Override
256    public Statistics getStatistics(String prefix) {
257        if (prefix == null) {
258            throw new NullPointerException("prefix == null");
259        }
260        if ((state == State.NEW) || (state == State.RUNNING)) {
261            throw new IllegalStateException("invalid state");
262        }
263        for (Statistics stats : statistics) {
264            if (prefix.equals(stats.getPrefix())) {
265                return stats;
266            }
267        }
268        return null;
269    }
270
271    @Override
272    public List<Statistics> getStatistics() {
273        if ((state == State.NEW) || (state == State.RUNNING)) {
274            throw new IllegalStateException("invalid state");
275        }
276        if ((statistics != null) && !statistics.isEmpty()) {
277            return Collections.unmodifiableList(statistics);
278        } else {
279            return null;
280        }
281    }
282
283    @Override
284    public void run() throws HarvesterException {
285        harvester.doRunJob(this);
286    }
287
288    @Override
289    public void cancel() throws HarvesterException {
290        setState(State.CANCELED);
291        harvester.doCancelJob(this);
292    }
293
294    @Override
295    public long getDelay(TimeUnit unit) {
296        return unit.convert((delayUntil - System.currentTimeMillis()),
297                TimeUnit.MILLISECONDS);
298    }
299
300    @Override
301    public int compareTo(Delayed o) {
302        final long me = getDelay(TimeUnit.MICROSECONDS);
303        final long they = o.getDelay(TimeUnit.MILLISECONDS);
304        if (me > they) {
305            return 1;
306        } else if (me < they) {
307            return -1;
308        } else {
309            return 0;
310        }
311    }
312
313    @Override
314    public String toString() {
315        StringBuilder sb = new StringBuilder();
316        sb.append(getClass().getCanonicalName());
317        sb.append("[id=");
318        sb.append(id);
319        sb.append(", state=");
320        sb.append(state);
321        sb.append(", task=");
322        sb.append(task);
323        sb.append("]");
324        return sb.toString();
325    }
326
327    public URI getRepositoryURI() {
328        return repositoryURI;
329    }
330
331    void setState(State state) {
332        if ((this.state != State.RUNNING) && (state == State.RUNNING)) {
333            startTimestamp = System.currentTimeMillis();
334        }
335        if ((this.state == State.RUNNING) && (state != State.RUNNING)) {
336            timeTotal = System.currentTimeMillis() - startTimestamp;
337        }
338        this.state = state;
339    }
340
341    void setRepositoryName(String repositoryName) {
342        this.repositoryName = repositoryName;
343    }
344
345    void setBaseURL(String baseURL) {
346        this.baseURL = baseURL;
347    }
348
349    void setProtocolVersion(String protocolVersion) {
350        this.protocolVersion = protocolVersion;
351    }
352   
353    void setAdminEmail(List<String> adminEmail) {
354        this.adminEmail = adminEmail;
355    }
356
357    void setEarliestTimestamp(String earliestTimestamp) {
358        this.earliestTimestamp = earliestTimestamp;
359    }
360
361    void setDeletedNotion(DeletedNotion deletedNotion) {
362        this.deletedNotion = deletedNotion;
363    }
364
365    void setGranularity(Granularity granularity) {
366        this.granularity = granularity;
367    }
368
369    Granularity getGranularity() {
370        return granularity;
371    }
372
373    void setCompressionMask(int compressionMask) {
374        this.compressionMask = compressionMask;
375    }
376   
377    int getCompressionMask() {
378        return compressionMask;
379    }
380
381    void incRequestCount() {
382        totalRequestCount++;
383        final StatisticsImpl stats = getCurrentStatistics();
384        if (stats != null) {
385            stats.incRequestCount();
386        }
387    }
388
389    void incResumptionCount() {
390        final StatisticsImpl stats = getCurrentStatistics();
391        if (stats != null) {
392            stats.incResumptionCount();
393        }
394    }
395
396    void addToNetworkTime(long delta) {
397        timeNetwork += delta;
398    }
399
400    void finishRequest(long bytesTransferred, long timeProcessingDelta) {
401        timeProcessing += timeProcessingDelta;
402        totalBytesTransferred += bytesTransferred;
403        final StatisticsImpl stats = getCurrentStatistics();
404        if (stats != null) {
405            stats.incBytesTransferred(bytesTransferred);
406        }
407    }
408
409    boolean isState(State state) {
410        return this.state == state;
411    }
412
413    Task getTask() {
414        return task;
415    }
416
417    void setTask(Task task) {
418        this.task = task;
419    }
420
421    String getCurrentPrefix() {
422        if (prefixWorklist == null) {
423            prefixWorklist = new ArrayList<String>();
424            if (metadataPrefixes != null) {
425                prefixWorklist.addAll(metadataPrefixes);
426            } else if (metadataFormats != null) {
427                for (MetadataFormat format : metadataFormats) {
428                    prefixWorklist.add(format.getPrefix());
429                }
430            } else {
431                prefixWorklist.add("oai_dc");
432            }
433        }
434        return prefixWorklist.isEmpty() ? null : prefixWorklist.get(0);
435    }
436
437    boolean removeCurrentPrefix() {
438        prefixWorklist.remove(0);
439        return !prefixWorklist.isEmpty();
440    }
441
442    String getResumptionToken() {
443        return resumptionToken;
444    }
445   
446    void setResumptionToken(String resumptionToken) {
447        this.resumptionToken = resumptionToken;
448    }
449
450    boolean isHarvestingNewPrefix() {
451        return (resumptionToken == null) && (networkRetryCount == 0);
452    }
453
454    int incNetworkRetryCount() {
455        return ++networkRetryCount;
456    }
457   
458    int getNetworkRetryCount() {
459        return networkRetryCount;
460    }
461
462    void resetNetworkRetryCount() {
463        delayUntil = 0;
464        networkRetryCount = 0;
465    }
466
467    void setNetworkRequestDelay(long delay) {
468        this.timeDelayed += delay;
469        this.delayUntil = System.currentTimeMillis() + delay;
470    }
471
472    void onStartListingRecords() {
473        final String prefix = getCurrentPrefix();
474        if (statistics == null) {
475            statistics = new ArrayList<Statistics>();
476        }
477        statistics.add(0, new StatisticsImpl(prefix));
478
479        if (handler != null) {
480            handler.onStartListingRecords(prefix);
481        }
482    }
483
484    void onFinishListingRecords() {
485        Date latestDatestamp = null;
486
487        final StatisticsImpl stats = getCurrentStatistics();
488        if (stats != null) {
489            latestDatestamp = stats.getLatestDatestamp();
490        }
491        if (handler != null) {
492            handler.onFinishListingRecords(getCurrentPrefix(), latestDatestamp);
493        }
494        // reset ...
495        resumptionToken = null;
496    }
497
498    void onRecord(Header header) {
499        totalRecordCount++;
500        StatisticsImpl stats = getCurrentStatistics();
501        if (stats != null) {
502            stats.incRecordCount();
503            stats.updateLatestDatestmp(header.getDatestamp());
504        }
505        if (handler != null) {
506            handler.onRecord(header);
507        }
508    }
509
510    void onRecordMetadata(Header header, XMLStreamReader reader) {
511        if (handler != null) {
512            handler.onRecordMetadata(header, reader);
513        }
514    }
515
516    void onRecordAbout(Header header, XMLStreamReader reader) {
517        if (handler != null) {
518            handler.onRecordAbout(header, reader);
519        }
520    }
521
522    void onIdentify() {
523        if (handler != null) {
524            final Repository repository = new Repository() {
525                @Override
526                public String getRepositoryName() {
527                    return repositoryName;
528                }
529               
530                @Override
531                public String getProtocolVersion() {
532                    return protocolVersion;
533                }
534               
535                @Override
536                public Granularity getGranularity() {
537                    return granularity;
538                }
539               
540                @Override
541                public String getEarliestTimestamp() {
542                    return earliestTimestamp;
543                }
544               
545                @Override
546                public DeletedNotion getDeletedNotion() {
547                    return deletedNotion;
548                }
549               
550                @Override
551                public int getCompressionMask() {
552                    return compressionMask;
553                }
554               
555                @Override
556                public String getBaseURL() {
557                    return baseURL;
558                }
559               
560                @Override
561                public List<String> getAdminEmail() {
562                    return adminEmail;
563                }
564            };
565            handler.onIdentify(repository);
566        }
567    }
568
569    void onListMetadataFormats(List<MetadataFormat> metadataFormats) {
570        this.metadataFormats = metadataFormats;
571        if (handler != null) {
572            handler.onListMetadataFormats(this.metadataFormats);
573        }
574    }
575
576    InputStream wrap(InputStream stream) throws IOException {
577        if (handler != null) {
578            return handler.wrap(stream);
579        } else {
580            return stream;
581        }
582    }
583
584    private StatisticsImpl getCurrentStatistics() {
585        if ((statistics != null) && !statistics.isEmpty()) {
586            return (StatisticsImpl) statistics.get(0);
587        }
588        return null;
589    }
590
591} // class HarvestJobImpl
Note: See TracBrowser for help on using the repository browser.