source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java @ 3021

Last change on this file since 3021 was 3021, checked in by oschonef, 11 years ago
  • creative hanbdling of resumptionToken for non-conforming OAI providers
  • Property svn:eol-style set to native
File size: 24.0 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8
9import javax.xml.namespace.QName;
10import javax.xml.stream.XMLStreamException;
11
12import org.apache.http.Header;
13import org.apache.http.HttpResponse;
14import org.apache.http.HttpStatus;
15import org.apache.http.client.HttpClient;
16import org.apache.http.client.methods.HttpGet;
17import org.apache.http.client.utils.HttpClientUtils;
18import org.apache.http.impl.client.DefaultHttpClient;
19import org.apache.http.impl.cookie.DateParseException;
20import org.apache.http.impl.cookie.DateUtils;
21import org.apache.http.params.CoreConnectionPNames;
22import org.apache.http.params.CoreProtocolPNames;
23import org.apache.http.params.HttpParams;
24import org.apache.http.util.EntityUtils;
25import org.joda.time.DateTime;
26import org.joda.time.DateTimeZone;
27import org.joda.time.MutableDateTime;
28import org.joda.time.format.DateTimeFormat;
29import org.joda.time.format.DateTimeFormatter;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import eu.clarin.cmdi.oai.harvester.Description;
34import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
35import eu.clarin.cmdi.oai.harvester.HarvesterException;
36import eu.clarin.cmdi.oai.harvester.HarvesterProtocolErrorException;
37import eu.clarin.cmdi.oai.harvester.MetadataFormat;
38import eu.clarin.cmdi.oai.harvester.ProtocolError;
39import eu.clarin.cmdi.oai.harvester.Repository;
40import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
41import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
42import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser;
43import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
44
45
46public class HarvestWorker {
47    private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/";
48    private static final String ARG_METADATA_PREFIX = "metadataPrefix";
49    private static final String ARG_RESUMPTION_TOKEN = "resumptionToken";
50    private static final String PROTOCOL_VERSION = "2.0";
51    private static final String DELETED_NOTION_NO = "no";
52    private static final String DELETED_NOTION_PERSISTENT = "persistent";
53    private static final String DELETED_NOTION_TRANSIENT = "transient";
54    private static final String GRANULARITY_DAYS = "YYYY-MM-DD";
55    private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ";
56    private static final String COMPRESSION_DEFLATE = "deflate";
57    private static final String COMPRESSION_GZIP = "gzip";
58    private static final String VERB_IDENTIFY = "Identify";
59    private static final String VERB_LIST_METADATAFORMATS = "ListMetadataFormats";
60    private static final String VERB_LIST_RECORDS = "ListRecords";
61    private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
62    private static final String HEADER_CONNECTION = "Connection";
63    private static final String DATEFORMAT_SECONDS = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'";
64    private static final String DATEFORMAT_DAYS = "yyyy'-'MM'-'dd";
65    private static final String[] DATEFORMATS =
66        { DATEFORMAT_SECONDS, DATEFORMAT_DAYS };
67    private static final Logger logger =
68            LoggerFactory.getLogger(HarvestWorker.class);
69    private final AbstractHarvester harvester;
70    private final HttpClient client;
71    private final RequestUriBuilder uriBuilder;
72    private final HeaderImpl header;
73    private final XmlStreamReaderProxy readerWrapper;
74    private long delay;
75
76
77    HarvestWorker(AbstractHarvester harvester) {
78        this.harvester = harvester;
79        // http client
80        client = new DefaultHttpClient();
81        client.getParams().setParameter(CoreProtocolPNames.USER_AGENT,
82                "eu.clarin.cmdi.oai.Harvester/0.0.1");
83        // request stuff
84        uriBuilder = new RequestUriBuilder();
85        // other stuff
86        this.header        = new HeaderImpl();
87        this.readerWrapper = new XmlStreamReaderProxy();
88    }
89
90
91    long process(HarvestJobImpl job, int maxNetworkTries)
92            throws HarvesterException {
93        if (job.isState(State.CANCELED)) {
94            return -1;
95        }
96        if (!job.isState(State.RUNNING)) {
97            throw new HarvesterException("job is not in running state");
98        }
99
100        boolean ok;
101        try {
102            switch (job.getTask()) {
103            case IDENTIFY_REPOSITORY:
104                ok = doIdentifyTask(job);
105                if (ok) {
106                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
107                }
108                break;
109            case ENUMERATE_METADATA_FORMATS:
110                ok = doListMetadataFormatsTask(job);
111                if (ok) {
112                    job.setTask(Task.HARVEST_RECORDS);
113                }
114                break;
115            case HARVEST_RECORDS:
116                // check for new prefix and notify start, if needed
117                if (job.isHarvestingNewPrefix()) {
118                    job.onStartListingRecords();
119                }
120                ok = doListRecordsTask(job);
121                if (ok) {
122                    // notify end of prefix
123                    job.onFinishListingRecords();
124                    if (!job.removeCurrentPrefix()) {
125                        job.setState(State.DONE);
126                    }
127                } else if ((job.getResumptionToken() != null) && (delay == 0)) {
128                    /*
129                     * special case: request successful, but list was split due
130                     * to OAI flow control
131                     */
132                    ok = true;
133                }
134                break;
135            default:
136                throw new HarvesterException("internal error");
137            } // switch
138
139            if (ok) {
140                job.resetNetworkRetryCount();
141            } else {
142                if (delay > 0) {
143                    job.setNetworkRequestDelay(delay);
144                }
145                if (job.incNetworkRetryCount() >= maxNetworkTries) {
146                    throw new HarvesterException("too many network retries: " +
147                            job.getNetworkRetryCount());
148                }
149            }
150            return delay;
151        } catch (Throwable e) {
152            job.setState(State.ERROR);
153            if (e instanceof HarvesterException) {
154                throw (HarvesterException) e;
155            } else if (e instanceof XMLStreamException) {
156                throw new HarvesterException("malformed OAI response", e);
157            } else if (e instanceof IOException) {
158                throw new HarvesterException("network error", e);
159            } else {
160                throw new HarvesterException("unexpected exception", e);
161            }
162        }
163    }
164
165
166    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
167            XMLStreamException, HarvesterException {
168        Response response = null;
169        try {
170            uriBuilder.setVerb(VERB_IDENTIFY);
171            response = execute(job, false);
172            if (response == null) {
173                return false;
174            }
175
176            response.readStart(OAI_NS, "Identify", true);
177
178            String s = response.readContent(OAI_NS, "repositoryName", true);
179            job.setRepositoryName(s);
180
181            s = response.readContent(OAI_NS, "baseURL", true);
182            job.setBaseURL(s);
183
184            s = response.readContent(OAI_NS, "protocolVersion", true);
185            if (!PROTOCOL_VERSION.equals(s)) {
186                throw new HarvesterException(
187                        "unsupported protocol version: " + s);
188            }
189            job.setProtocolVersion(PROTOCOL_VERSION);
190
191            List<String> adminEmail = null;
192            do {
193                s = response.readContent(OAI_NS,
194                        "adminEmail", (adminEmail == null));
195                if (s != null) {
196                    if (adminEmail == null) {
197                        adminEmail = new ArrayList<String>();
198                    }
199                    adminEmail.add(s);
200                }
201            } while (s != null);
202            job.setAdminEmail(adminEmail);
203
204            /*
205             * defer setting of earliestDatestamp in job until we know the
206             * datestamp granularity of the repository
207             */
208            final String earliestDatestamp =
209                    response.readContent(OAI_NS, "earliestDatestamp", true);
210
211            s = response.readContent(OAI_NS, "deletedRecord", true);
212            if (DELETED_NOTION_NO.equals(s)) {
213                job.setDeletedNotion(DeletedNotion.NO);
214            } else if (DELETED_NOTION_PERSISTENT.equals(s)) {
215                job.setDeletedNotion(DeletedNotion.PERSISTENT);
216            } else if (DELETED_NOTION_TRANSIENT.equals(s)) {
217                job.setDeletedNotion(DeletedNotion.TRANSIENT);
218            } else {
219                throw new HarvesterException(
220                        "invalid value for 'deletedRecord': " + s);
221            }
222
223            s = response.readContent(OAI_NS, "granularity", true);
224            Granularity granularity = null;
225            if (GRANULARITY_DAYS.equals(s)) {
226                granularity = Granularity.DAYS;
227            } else if (GRANULARITY_SECONDS.equals(s)) {
228                granularity = Granularity.SECONDS;
229            } else {
230                throw new HarvesterException(
231                        "invalid value for 'granularity': " + s);
232            }
233            job.setGranularity(granularity);
234            job.setEarliestTimestamp(parseDate(granularity, earliestDatestamp));
235
236            int mask = 0;
237            do {
238                s = response.readContent(OAI_NS, "compression", false, false);
239                if (s != null) {
240                    if (s.isEmpty()) {
241                        logger.warn("skipping empty <compression> element " +
242                                "in Identify response");
243                    } else if (COMPRESSION_DEFLATE.equals(s)) {
244                        mask |= Repository.COMPRESSION_METHOD_DEFLATE;
245                    } else if (COMPRESSION_GZIP.equals(s)) {
246                        mask |= Repository.COMPRESSION_METHOD_GZIP;
247                    } else {
248                        logger.info("ignoring unsupported compression " +
249                                "method '{}'", s);
250                    }
251                }
252            } while (s != null);
253            job.setCompressionMask(mask);
254
255            List<Description> descriptions = null;
256            while (response.readStart(OAI_NS, "description", false)) {
257                response.consumeWhitespace();
258                final String namespaceURI = response.readNamespaceURI();
259                final String localName = response.peekElementLocalName();
260                DescriptionParser parser = harvester.findDescriptionParser(
261                        namespaceURI, localName);
262                if (parser != null) {
263                    Description desc = parser.parseDescription(
264                            response.getXMLStreamReader());
265                    if (desc != null) {
266                        if (descriptions == null) {
267                            descriptions = new ArrayList<Description>();
268                        }
269                        descriptions.add(desc);
270                    }
271                } else {
272                    logger.info("skipping <description> for {}", new QName(
273                            namespaceURI, localName));
274                }
275                response.readEnd(OAI_NS, "description", true);
276                job.setDescriptions(descriptions);
277            }
278            response.readEnd(OAI_NS, "Identify");
279            response.close();
280
281            job.onIdentify();
282
283            return true;
284        } finally {
285            if (response != null) {
286                response.release(job);
287            }
288        }
289    }
290
291
292    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
293            throws IOException, XMLStreamException, HarvesterException {
294        Response response = null;
295        try {
296            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
297            response = execute(job, true);
298            if (response == null) {
299                return false;
300            }
301
302            response.readStart(OAI_NS, "ListMetadataFormats", true);
303            List<MetadataFormat> metadataFormats =
304                    new ArrayList<MetadataFormat>();
305            while (response.readStart(OAI_NS, "metadataFormat",
306                    metadataFormats.isEmpty())) {
307                final String prefix =
308                        response.readContent(OAI_NS, "metadataPrefix", true);
309                final String schema =
310                        response.readContent(OAI_NS, "schema", true);
311                final String namespace =
312                        response.readContent(OAI_NS, "metadataNamespace", true);
313                metadataFormats.add(
314                        new MetadataFormatImpl(prefix, schema, namespace));
315                response.readEnd(OAI_NS, "metadataFormat");
316            } // while
317            response.readEnd(OAI_NS, "ListMetadataFormats");
318            response.close();
319
320            job.onListMetadataFormats(metadataFormats);
321
322            return true;
323        } finally {
324            if (response != null) {
325                response.release(job);
326            }
327        }
328    }
329
330
331    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
332            XMLStreamException, HarvesterException {
333        Response response = null;
334        try {
335            String token = job.getResumptionToken();
336            uriBuilder.setVerb(VERB_LIST_RECORDS);
337            if (token == null) {
338                uriBuilder.addParameter(ARG_METADATA_PREFIX,
339                        job.getCurrentPrefix());
340                if (job.getFrom() != null) {
341                    uriBuilder.addParameter("from",
342                            formatDate(job.getGranularity(), job.getFrom()));
343                }
344                if (job.getUntil() != null) {
345                    uriBuilder.addParameter("until",
346                            formatDate(job.getGranularity(), job.getUntil()));
347                }
348                if (job.getSet() != null) {
349                    uriBuilder.addParameter("set", job.getSet());
350                }
351            } else {
352                uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token);
353                job.incResumptionCount();
354            }
355
356            response = execute(job, true);
357            if (response == null) {
358                return false;
359            }
360
361            response.readStart(OAI_NS, "ListRecords", true);
362            header.reset();
363            boolean first = true;
364            while (response.readStart(OAI_NS, "record", first)) {
365                boolean deleted = false;
366
367                response.readStart(OAI_NS, "header", true, true);
368                String s = response.readAttributeValue(null, "status");
369                if (s != null) {
370                    if ("deleted".equals(s)) {
371                        header.setDeleted(true);
372                        deleted = true;
373                    } else {
374                        throw new XMLStreamException("attribute 'status' of "
375                                + "element 'record' must contain 'deleted'",
376                                response.getLocation());
377                    }
378                }
379                response.consumeStart();
380                s = response.readContent(OAI_NS, "identifier", true);
381                header.setIdentifier(s);
382                s = response.readContent(OAI_NS, "datestamp", true);
383                header.setDatestamp(parseDate(job.getGranularity(), s));
384                while (response.readStart(OAI_NS, "setSpec", false)) {
385                    header.addSetSpec(response.readString(true));
386                    response.readEnd(OAI_NS, "setSpec", true);
387                }
388                response.readEnd(OAI_NS, "header");
389                job.onRecord(header);
390                /*
391                 * only parse metadata from non-deleted records
392                 */
393                if (!deleted) {
394                    if (response.readStart(OAI_NS, "metadata", true)) {
395                        response.consumeWhitespace();
396                        readerWrapper.reset(response.getXMLStreamReader());
397                        job.onRecordMetadata(header, readerWrapper);
398                        response.consumeWhitespace();
399                        response.readEnd(OAI_NS, "metadata");
400                    }
401                }
402                while (response.readStart(OAI_NS, "about", false)) {
403                    response.consumeWhitespace();
404                    job.onRecordAbout(header, null);
405                    response.consumeWhitespace();
406                    response.readEnd(OAI_NS, "about", true);
407                } // while (about)
408                response.readEnd(OAI_NS, "record");
409                first = false;
410                header.reset();
411            } // while (record)
412
413            /*
414             * expect resumptionToken (possibly empty) when request was resumed
415             */
416            String nextToken = null;
417            boolean got_token = false;
418            if (response.readStart(OAI_NS, "resumptionToken", false, true)) {
419                // XXX: read attributes here!
420                response.consumeStart();
421
422                nextToken = response.readString(false);
423                if (nextToken != null) {
424                    if (!nextToken.isEmpty()) {
425                        job.setResumptionToken(nextToken);
426                    } else {
427                        nextToken = null;
428                    }
429                }
430                response.readEnd(OAI_NS, "resumptionToken");
431                got_token = true;
432            }
433            if ((token != null) && (nextToken == null) && !got_token) {
434                logger.error("An <resumptionToken> element was expected, " +
435                        "but none was found; assuming a complete result list.");
436                logger.error("An OAI provider is required to serialize an " +
437                        "empty <resumptionToken> element to announce that " +
438                        "the result list is complete.");
439            }
440            response.readEnd(OAI_NS, "ListRecords");
441
442            response.close();
443
444            return (nextToken == null);
445        } catch (HarvesterProtocolErrorException e) {
446            /*
447             * XXX: handle NO_RECORDS_MATCH protocol error
448             */
449            List<ProtocolError> errors = e.getErrors();
450            if ((errors != null) && !errors.isEmpty()) {
451                for (ProtocolError error : errors) {
452                    if (error.isCode(ProtocolError.Code.NO_RECORDS_MATCH)) {
453                        logger.debug("no records match prefix '{}'",
454                                job.getCurrentPrefix());
455                        return true;
456                    }
457                }
458            }
459            throw e;
460        } finally {
461            if (response != null) {
462                response.release(job);
463            }
464        }
465    }
466
467
468    private Response execute(HarvestJobImpl job, boolean compress)
469            throws IOException, XMLStreamException, HarvesterException {
470        final long now = System.currentTimeMillis();
471        job.incRequestCount();
472        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
473        logger.debug("Performing HTTP GET request to {}", uri);
474        final HttpGet request = new HttpGet(uri);
475        if (compress) {
476            final int mask = job.getCompressionMask();
477            if ((mask & Repository.COMPRESSION_METHOD_GZIP) > 0) {
478                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
479            }
480            if ((mask & Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
481                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
482            }
483            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
484        }
485        request.addHeader(HEADER_CONNECTION, "close");
486        // FIXME: get timeout values from somewhere?
487        HttpParams params = request.getParams();
488        params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
489        params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
490
491        final HttpResponse response = client.execute(request);
492        /*
493         * FIXME: the following code need serious re-factoring
494         */
495        boolean close = false;
496        try {
497            job.addToNetworkTime(System.currentTimeMillis() - now);
498            final int status = response.getStatusLine().getStatusCode();
499            if (status == HttpStatus.SC_OK) {
500                delay = 0;
501                return new Response(job, response, harvester);
502            } else {
503                if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
504                    delay = parseTryAfter(response);
505                    logger.debug("got service unavailable status, retrying " +
506                            "after {} seconds", delay);
507                    close = true;
508                    return null;
509                } else {
510                    throw new HarvesterException(
511                            "provider returned unexpected HTTP status: " +
512                                    status);
513                }
514            }
515        } catch (IOException e) {
516            close = true;
517            throw e;
518        } catch (XMLStreamException e) {
519            close = true;
520            throw e;
521        } catch (HarvesterException e) {
522            close = true;
523            throw e;
524        } finally {
525            if (close) {
526                /*
527                 * try hard to release HTTP client resources ...
528                 */
529                try {
530                    EntityUtils.consume(response.getEntity());
531                } catch (IOException ex) {
532                    /* IGNORE */
533                }
534
535                /* make sure to release allocated resources */
536                HttpClientUtils.closeQuietly(response);
537
538                if (request != null) {
539                    request.abort();
540                }
541            }
542        }
543    }
544
545
546    private long parseTryAfter(HttpResponse response) {
547        long delay = -1;
548        final Header s = response.getFirstHeader("Retry-After");
549        if (s != null) {
550            final String v = s.getValue();
551            try {
552                /*
553                 * if plain number, it is the time in seconds
554                 */
555                delay = Integer.parseInt(v) * 1000;
556            } catch (NumberFormatException e) {
557                try {
558                    Date date = DateUtils.parseDate(v);
559                    delay = date.getTime() - System.currentTimeMillis();
560                } catch (DateParseException f) {
561                    /* IGNORE */
562                }
563            }
564        }
565        return (delay > 0) ? delay : 0;
566    }
567
568
569    private static Date parseDate(Repository.Granularity granularity, String s)
570            throws HarvesterException {
571        final MutableDateTime date = new MutableDateTime(0);
572        for (int i = 0; i < DATEFORMATS.length; i++) {
573            DateTimeFormatter fmt = DateTimeFormat
574                    .forPattern(DATEFORMATS[i])
575                    .withZone(DateTimeZone.UTC);
576            if (fmt.parseInto(date, s, 0) == s.length()) {
577                if (DATEFORMATS[i].equals(DATEFORMAT_SECONDS) &&
578                        (granularity == Repository.Granularity.DAYS)) {
579                    logger.warn("repository announced DAYS granularity but " +
580                            "provided timestamp with SECONDS granularity");
581                }
582                if (DATEFORMATS[i].equals(DATEFORMAT_DAYS)) {
583                    date.setTime(0, 0, 0, 0);
584                }
585                return date.toDate();
586            }
587        }
588        throw new HarvesterException("invalid date: " + s);
589    }
590
591
592    private static String formatDate(Repository.Granularity granularity,
593            Date date) throws HarvesterException {
594        DateTimeFormatter fmt = null;
595        switch (granularity) {
596        case DAYS:
597            fmt = DateTimeFormat.forPattern(DATEFORMAT_DAYS)
598                .withZone(DateTimeZone.UTC);
599            break;
600        case SECONDS:
601            fmt = DateTimeFormat.forPattern(DATEFORMAT_SECONDS)
602                .withZone(DateTimeZone.UTC);
603            break;
604        default:
605            /* cannot happen, but silence FindBugs warning */
606            throw new HarvesterException("invalid granularity: " + granularity);
607        }
608        return fmt.print(new DateTime(date));
609    }
610
611} // class HarvestWorker
Note: See TracBrowser for help on using the repository browser.