source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java @ 1163

Last change on this file since 1163 was 1163, checked in by oschonef, 13 years ago
  • convert ealiestDatestamp to Date in Identify
  • add pluggable parser architecture for additional descriptions in Identify
  • add description parsers for OAI-identifier and OAI Dublin Core descriptions
  • Property svn:eol-style set to native
File size: 20.2 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8
9import javax.xml.stream.XMLStreamException;
10
11import org.apache.http.Header;
12import org.apache.http.HttpResponse;
13import org.apache.http.HttpStatus;
14import org.apache.http.client.methods.HttpGet;
15import org.apache.http.impl.client.DefaultHttpClient;
16import org.apache.http.impl.cookie.DateParseException;
17import org.apache.http.impl.cookie.DateUtils;
18import org.apache.http.params.CoreConnectionPNames;
19import org.apache.http.params.CoreProtocolPNames;
20import org.apache.http.params.HttpParams;
21import org.joda.time.DateTime;
22import org.joda.time.DateTimeZone;
23import org.joda.time.MutableDateTime;
24import org.joda.time.format.DateTimeFormat;
25import org.joda.time.format.DateTimeFormatter;
26
27import eu.clarin.cmdi.oai.harvester.Description;
28import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
29import eu.clarin.cmdi.oai.harvester.HarvesterException;
30import eu.clarin.cmdi.oai.harvester.MetadataFormat;
31import eu.clarin.cmdi.oai.harvester.Repository;
32import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
33import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
34import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser;
35import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
36
37public class HarvestWorker {
38    private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/";
39    private static final String ARG_METADATA_PREFIX = "metadataPrefix";
40    private static final String ARG_RESUMPTION_TOKEN = "resumptionToken";
41    private static final String PROTOCOL_VERSION = "2.0";
42    private static final String DELETED_NOTION_NO = "no";
43    private static final String DELETED_NOTION_PERSISTENT = "persistent";
44    private static final String DELETED_NOTION_TRANSIENT = "transient";
45    private static final String GRANULARITY_DAYS = "YYYY-MM-DD";
46    private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ";
47    private static final String COMPRESSION_DEFLATE = "deflate";
48    private static final String COMPRESSION_GZIP = "gzip";
49    private static final String VERB_IDENTIFY = "Identify";
50    private static final String VERB_LIST_METADATAFORMATS =
51        "ListMetadataFormats";
52    private static final String VERB_LIST_RECORDS = "ListRecords";
53    private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
54    private static final String HEADER_CONNECTION = "Connection";
55    private static final String[] DATEFORMATS_DAYS =
56        { "yyyy'-'MM'-'dd" };
57    private static final String[] DATEFORMATS_FULL =
58        { "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'", "yyyy'-'MM'-'dd" };
59    private final AbstractHarvester harvester;
60    private final DefaultHttpClient client;
61    private final RequestUriBuilder uriBuilder;
62    private final HeaderImpl header;
63    private final XmlStreamReaderProxy readerWrapper;
64    private long delay;
65
66    HarvestWorker(AbstractHarvester harvester) {
67        this.harvester = harvester;
68        // http client
69        client = new DefaultHttpClient();
70        client.getParams().setParameter(CoreProtocolPNames.USER_AGENT,
71                    "eu.clarin.cmdi.oai.Harvester/0.0.1");
72        // request stuff
73        uriBuilder = new RequestUriBuilder();
74        // other stuff
75        this.header = new HeaderImpl();
76        this.readerWrapper = new XmlStreamReaderProxy();
77    }
78
79    long process(HarvestJobImpl job, int maxNetworkTries)
80            throws HarvesterException {
81        if (job.isState(State.CANCELED)) {
82            return -1;
83        }
84        if (!job.isState(State.RUNNING)) {
85            throw new HarvesterException("job is not in running state");
86        }
87
88        boolean ok;
89        try {
90            switch (job.getTask()) {
91            case IDENTIFY_REPOSITORY:
92                ok = doIdentifyTask(job);
93                if (ok) {
94                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
95                }
96                break;
97            case ENUMERATE_METADATA_FORMATS:
98                ok = doListMetadataFormatsTask(job);
99                if (ok) {
100                    job.setTask(Task.HARVEST_RECORDS);
101                }
102                break;
103            case HARVEST_RECORDS:
104                // check for new prefix and notify start, if needed
105                if (job.isHarvestingNewPrefix()) {
106                    job.onStartListingRecords();
107                }
108                ok = doListRecordsTask(job);
109                if (ok) {
110                    // notify end of prefix
111                    job.onFinishListingRecords();
112                    if (!job.removeCurrentPrefix()) {
113                        job.setState(State.DONE);
114                    }
115                } else if ((job.getResumptionToken() != null) && (delay == 0)) {
116                    /*
117                     * special case: request successful, but list was split
118                     * die to OAI flow control
119                     */
120                    ok = true;
121                }
122                break;
123            default:
124                throw new HarvesterException("internal error");
125            } // switch
126
127            if (ok) {
128                job.resetNetworkRetryCount();
129            } else {
130                if (delay > 0) {
131                    job.setNetworkRequestDelay(delay);
132                }
133                if (job.incNetworkRetryCount() >= maxNetworkTries) {
134                    throw new HarvesterException("too many network retries: " +
135                            job.getNetworkRetryCount());
136                }
137            }
138            return delay;
139        } catch (Throwable e) {
140            job.setState(State.ERROR);
141            if (e instanceof HarvesterException) {
142                throw (HarvesterException) e;
143            } else if (e instanceof XMLStreamException) {
144                throw new HarvesterException("malformed OAI response", e);
145            } else if (e instanceof IOException) {
146                throw new HarvesterException("network error", e);
147            } else {
148                throw new HarvesterException("unexpected exception", e);
149            }
150        }
151    }
152
153    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
154            XMLStreamException, HarvesterException {
155        Response response = null;
156        try {
157            uriBuilder.setVerb(VERB_IDENTIFY);
158            response = execute(job, false);
159            if (response == null) {
160                return false;
161            }
162
163            response.readStart(OAI_NS, "Identify", true);
164
165            String s = response.readContent(OAI_NS, "repositoryName", true);
166            job.setRepositoryName(s);
167
168            s = response.readContent(OAI_NS, "baseURL", true);
169            job.setBaseURL(s);
170
171            s = response.readContent(OAI_NS, "protocolVersion", true);
172            if (!PROTOCOL_VERSION.equals(s)) {
173                throw new HarvesterException("unsupported protocol version: " +
174                        s);
175            }
176            job.setProtocolVersion(PROTOCOL_VERSION);
177
178            List<String> adminEmail = null;
179            do {
180                s = response.readContent(OAI_NS, "adminEmail",
181                        (adminEmail == null));
182                if (s != null) {
183                    if (adminEmail == null) {
184                        adminEmail = new ArrayList<String>();
185                    }
186                    adminEmail.add(s);
187                }
188            } while (s != null);
189            job.setAdminEmail(adminEmail);
190
191            /*
192             *  defer setting of earliestDatestamp in job until we know
193             *  the datestamp granularity of the repository
194             */
195            final String earliestDatestamp =
196                response.readContent(OAI_NS, "earliestDatestamp", true);
197
198            s = response.readContent(OAI_NS, "deletedRecord", true);
199            if (DELETED_NOTION_NO.equals(s)) {
200                job.setDeletedNotion(DeletedNotion.NO);
201            } else if (DELETED_NOTION_PERSISTENT.equals(s)) {
202                job.setDeletedNotion(DeletedNotion.PERSISTENT);
203            } else if (DELETED_NOTION_TRANSIENT.equals(s)) {
204                job.setDeletedNotion(DeletedNotion.TRANSIENT);
205            } else {
206                throw new HarvesterException(
207                        "invalid value for 'deletedRecord': " + s);
208            }
209
210            s = response.readContent(OAI_NS, "granularity", true);
211            Granularity granularity = null;
212            if (GRANULARITY_DAYS.equals(s)) {
213                granularity = Granularity.DAYS;
214            } else if (GRANULARITY_SECONDS.equals(s)) {
215                granularity = Granularity.SECONDS;
216            } else {
217                throw new HarvesterException(
218                        "invalid value for 'granularity': " + s);
219            }
220            job.setGranularity(granularity);
221            job.setEarliestTimestamp(parseDate(granularity, earliestDatestamp));
222
223            int mask = 0;
224            do {
225                s = response.readContent(OAI_NS, "compression", false);
226                if (s != null) {
227                    if (COMPRESSION_DEFLATE.equals(s)) {
228                        mask |= Repository.COMPRESSION_METHOD_DEFLATE;
229                    } else if (COMPRESSION_GZIP.equals(s)) {
230                        mask |= Repository.COMPRESSION_METHOD_GZIP;
231                    }
232                }
233            } while (s != null);
234            job.setCompressionMask(mask);
235
236            List<Description> descriptions = null;
237            while (response.readStart(OAI_NS, "description", false)) {
238                response.consumeWhitespace();
239                final String namespaceURI = response.readNamespaceURI();
240                final String localName = response.peekElementLocalName();
241                DescriptionParser parser = harvester.findDescriptionParser(
242                        namespaceURI, localName);
243                if (parser != null) {
244                    Description desc =
245                        parser.parseDescription(response.getXMLStreamReader());
246                    if (desc != null) {
247                        if (descriptions == null) {
248                            descriptions = new ArrayList<Description>();
249                        }
250                        descriptions.add(desc);
251                    }
252                } else {
253                    System.err.println("skipping DESC " + namespaceURI + ", " +
254                            localName);
255                }
256                response.readEnd(OAI_NS, "description", true);
257                job.setDescriptions(descriptions);
258            }
259            response.readEnd(OAI_NS, "Identify");
260            response.close();
261           
262            job.onIdentify();
263
264            return true;
265        } finally {
266            if (response != null) {
267                response.release(job);
268            }
269        }
270    }
271
272    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
273            throws IOException, XMLStreamException, HarvesterException {
274        Response response = null;
275        try {
276            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
277            response = execute(job, true);
278            if (response == null) {
279                return false;
280            }
281
282            response.readStart(OAI_NS, "ListMetadataFormats", true);
283            List<MetadataFormat> metadataFormats =
284                new ArrayList<MetadataFormat>();
285            while (response.readStart(OAI_NS, "metadataFormat",
286                    metadataFormats.isEmpty())) {
287                final String prefix =
288                    response.readContent(OAI_NS, "metadataPrefix", true);
289                final String schema =
290                    response.readContent(OAI_NS, "schema", true);
291                final String namespace =
292                    response.readContent(OAI_NS, "metadataNamespace", true);
293                metadataFormats.add(
294                        new MetadataFormatImpl(prefix, schema, namespace));
295                response.readEnd(OAI_NS, "metadataFormat");
296            } // while
297            response.readEnd(OAI_NS, "ListMetadataFormats");
298            response.close();
299
300            job.onListMetadataFormats(metadataFormats);
301           
302            return true;
303        } finally {
304            if (response != null) {
305                response.release(job);
306            }
307        }
308    }
309
310    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
311            XMLStreamException, HarvesterException {
312        Response response = null;
313        try {
314            String token = job.getResumptionToken();
315            uriBuilder.setVerb(VERB_LIST_RECORDS);
316            if (token == null) {
317                uriBuilder.addParameter(ARG_METADATA_PREFIX,
318                        job.getCurrentPrefix());
319                if (job.getFrom() != null) {
320                    uriBuilder.addParameter("from",
321                            formatDate(job.getGranularity(), job.getFrom()));
322                }
323                if (job.getUntil() != null) {
324                    uriBuilder.addParameter("until",
325                            formatDate(job.getGranularity(), job.getUntil()));
326                }
327                if (job.getSet() != null) {
328                    uriBuilder.addParameter("set", job.getSet());
329                }
330            } else {
331                uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token);
332                job.incResumptionCount();
333            }
334
335            response = execute(job, true);
336            if (response == null) {
337                return false;
338            }
339
340            String nextToken = null;
341
342            response.readStart(OAI_NS, "ListRecords", true);
343            header.reset();
344            boolean first = true;
345            while (response.readStart(OAI_NS, "record", first)) {
346                response.readStart(OAI_NS, "header", true, true);
347                String s = response.readAttributeValue(null, "status");
348                if (s != null) {
349                    if ("deleted".equals(s)) {
350                        header.setDeleted(true);
351                    } else {
352                        throw new XMLStreamException("attribute 'status' of " +
353                                "element 'record' must contain 'deleted'");
354                    }
355                }
356                response.consumeStart();
357                s = response.readContent(OAI_NS, "identifier", true);
358                header.setIdentifier(s);
359                s = response.readContent(OAI_NS, "datestamp", true);
360                header.setDatestamp(parseDate(job.getGranularity(), s));
361                while (response.readStart(OAI_NS, "setSpec", false)) {
362                    header.addSetSpec(response.readString(true));
363                    response.readEnd(OAI_NS, "setSpec", true);
364                }
365                response.readEnd(OAI_NS, "header");
366                job.onRecord(header);
367                if (response.readStart(OAI_NS, "metadata", true)) {
368                    response.consumeWhitespace();
369                    readerWrapper.reset(response.getXMLStreamReader());
370                    job.onRecordMetadata(header, readerWrapper);
371                    response.consumeWhitespace();
372                    response.readEnd(OAI_NS, "metadata");
373                }
374                while (response.readStart(OAI_NS, "about", false)) {
375                    response.consumeWhitespace();
376                    job.onRecordAbout(header, null);
377                    response.consumeWhitespace();
378                    response.readEnd(OAI_NS, "about", true);
379                } // while (about)
380                response.readEnd(OAI_NS, "record");
381                first = false;
382                header.reset();
383            } // while (record)
384
385            /*
386             * expect resumptionToken (possibly empty) when request was resumed
387             */
388            if (response.readStart(OAI_NS, "resumptionToken",
389                    (token != null), true)) {
390                // XXX: read attributes here!
391                response.consumeStart();
392
393                nextToken = response.readString(false);
394                if (nextToken != null) {
395                    job.setResumptionToken(nextToken);
396                }
397                response.readEnd(OAI_NS, "resumptionToken");
398            }
399            response.readEnd(OAI_NS, "ListRecords");
400           
401            response.close();
402
403            return (nextToken == null);
404        } finally {
405            if (response != null) {
406                response.release(job);
407            }
408        }
409    }
410
411    private Response execute(HarvestJobImpl job, boolean compress)
412            throws IOException, XMLStreamException, HarvesterException {
413        long now = System.currentTimeMillis();
414        job.incRequestCount();
415        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
416        System.err.println("HTTP GET " + uri);
417        final HttpGet request = new HttpGet(uri);
418        if (compress) {
419            if ((job.getCompressionMask() &
420                    Repository.COMPRESSION_METHOD_GZIP) > 0) {
421                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
422            }
423            if ((job.getCompressionMask() &
424                    Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
425                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
426            }
427            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
428        }
429        request.addHeader(HEADER_CONNECTION, "close");
430        // FIXME: get timeout values from somewhere?
431        HttpParams params = request.getParams();
432        params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
433        params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
434
435        final HttpResponse response = client.execute(request);
436        job.addToNetworkTime(System.currentTimeMillis() - now);
437        int status = response.getStatusLine().getStatusCode();
438        if (status == HttpStatus.SC_OK) {
439            delay = 0;
440            return new Response(job, response.getEntity(), harvester);
441        } else {
442            request.abort();
443            if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
444                System.err.println("-> " + status);
445                delay = parseTryAfter(response);
446            } else {
447                throw new HarvesterException(
448                        "provider returned unexpected HTTP status: " + status);
449            }
450        }
451        return null;
452    }
453
454    private long parseTryAfter(HttpResponse response) {
455        long delay = -1;
456        final Header s = response.getFirstHeader("Retry-After");
457        if (s != null) {
458            final String v = s.getValue();
459            try {
460                /*
461                 * if plain number, it is the time in seconds
462                 */
463                delay = Integer.parseInt(v) * 1000;
464            } catch (NumberFormatException e) {
465                try {
466                    Date date = DateUtils.parseDate(v);
467                    delay = date.getTime() - System.currentTimeMillis();
468                } catch (DateParseException f) {
469                    /* IGNORE */
470                }
471            }
472        }
473        return (delay > 0) ? delay : 0;
474    }
475
476    private Date parseDate(Repository.Granularity granularity, String s)
477            throws HarvesterException {
478        String[] patterns = null;
479        switch (granularity) {
480        case DAYS:
481            patterns = DATEFORMATS_DAYS;
482            break;
483        case SECONDS:
484            patterns = DATEFORMATS_FULL;
485        }
486        MutableDateTime date = new MutableDateTime(0);
487        for (int i = 0; i < patterns.length; i++) {
488            DateTimeFormatter fmt = DateTimeFormat
489                .forPattern(patterns[i])
490                .withZone(DateTimeZone.UTC);
491            if (fmt.parseInto(date, s, 0) == s.length()) {
492                if (patterns[i].equals(DATEFORMATS_DAYS[0])) {
493                    date.setTime(0, 0, 0, 0);
494                }
495                return date.toDate();
496            }
497        }
498        throw new HarvesterException("invalid date: " + s);
499    }
500
501    private String formatDate(Repository.Granularity granularity, Date date) {
502        String pattern = null;
503        switch (granularity) {
504        case DAYS:
505            pattern = DATEFORMATS_DAYS[0];
506            break;
507        case SECONDS:
508            pattern = DATEFORMATS_FULL[0];
509        }
510        DateTimeFormatter fmt =
511            DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.UTC);
512        return fmt.print(new DateTime(date));
513    }
514
515} // class HarvestWorker
Note: See TracBrowser for help on using the repository browser.