source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java @ 3931

Last change on this file since 3931 was 3931, checked in by oschonef, 11 years ago
  • band-aid to skip some invalid metadata records from MPI
  • Property svn:eol-style set to native
File size: 24.4 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8
9import javax.xml.namespace.QName;
10import javax.xml.stream.XMLStreamException;
11
12import org.apache.http.Header;
13import org.apache.http.HttpResponse;
14import org.apache.http.HttpStatus;
15import org.apache.http.client.HttpClient;
16import org.apache.http.client.methods.HttpGet;
17import org.apache.http.client.utils.HttpClientUtils;
18import org.apache.http.impl.client.DefaultHttpClient;
19import org.apache.http.impl.cookie.DateParseException;
20import org.apache.http.impl.cookie.DateUtils;
21import org.apache.http.params.CoreConnectionPNames;
22import org.apache.http.params.CoreProtocolPNames;
23import org.apache.http.params.HttpParams;
24import org.apache.http.util.EntityUtils;
25import org.joda.time.DateTime;
26import org.joda.time.DateTimeZone;
27import org.joda.time.MutableDateTime;
28import org.joda.time.format.DateTimeFormat;
29import org.joda.time.format.DateTimeFormatter;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import eu.clarin.cmdi.oai.harvester.Description;
34import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
35import eu.clarin.cmdi.oai.harvester.HarvesterException;
36import eu.clarin.cmdi.oai.harvester.HarvesterProtocolErrorException;
37import eu.clarin.cmdi.oai.harvester.MetadataFormat;
38import eu.clarin.cmdi.oai.harvester.ProtocolError;
39import eu.clarin.cmdi.oai.harvester.Repository;
40import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
41import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
42import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser;
43import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
44
45
46public class HarvestWorker {
47    private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/";
48    private static final String ARG_METADATA_PREFIX = "metadataPrefix";
49    private static final String ARG_RESUMPTION_TOKEN = "resumptionToken";
50    private static final String PROTOCOL_VERSION = "2.0";
51    private static final String DELETED_NOTION_NO = "no";
52    private static final String DELETED_NOTION_PERSISTENT = "persistent";
53    private static final String DELETED_NOTION_TRANSIENT = "transient";
54    private static final String GRANULARITY_DAYS = "YYYY-MM-DD";
55    private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ";
56    private static final String COMPRESSION_DEFLATE = "deflate";
57    private static final String COMPRESSION_GZIP = "gzip";
58    private static final String VERB_IDENTIFY = "Identify";
59    private static final String VERB_LIST_METADATAFORMATS = "ListMetadataFormats";
60    private static final String VERB_LIST_RECORDS = "ListRecords";
61    private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
62    private static final String HEADER_CONNECTION = "Connection";
63    private static final String DATEFORMAT_SECONDS = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'";
64    private static final String DATEFORMAT_DAYS = "yyyy'-'MM'-'dd";
65    private static final String[] DATEFORMATS =
66        { DATEFORMAT_SECONDS, DATEFORMAT_DAYS };
67    private static final Logger logger =
68            LoggerFactory.getLogger(HarvestWorker.class);
69    private final AbstractHarvester harvester;
70    private final HttpClient client;
71    private final RequestUriBuilder uriBuilder;
72    private final HeaderImpl header;
73    private final XmlStreamReaderProxy readerWrapper;
74    private long delay;
75
76
77    HarvestWorker(AbstractHarvester harvester) {
78        this.harvester = harvester;
79        // http client
80        client = new DefaultHttpClient();
81        client.getParams().setParameter(CoreProtocolPNames.USER_AGENT,
82                "eu.clarin.cmdi.oai.Harvester/0.0.1");
83        // request stuff
84        uriBuilder = new RequestUriBuilder();
85        // other stuff
86        this.header        = new HeaderImpl();
87        this.readerWrapper = new XmlStreamReaderProxy();
88    }
89
90
91    long process(HarvestJobImpl job, int maxNetworkTries)
92            throws HarvesterException {
93        if (job.isState(State.CANCELED)) {
94            return -1;
95        }
96        if (!job.isState(State.RUNNING)) {
97            throw new HarvesterException("job is not in running state");
98        }
99
100        boolean ok;
101        try {
102            switch (job.getTask()) {
103            case IDENTIFY_REPOSITORY:
104                ok = doIdentifyTask(job);
105                if (ok) {
106                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
107                }
108                break;
109            case ENUMERATE_METADATA_FORMATS:
110                ok = doListMetadataFormatsTask(job);
111                if (ok) {
112                    job.setTask(Task.HARVEST_RECORDS);
113                }
114                break;
115            case HARVEST_RECORDS:
116                // check for new prefix and notify start, if needed
117                if (job.isHarvestingNewPrefix()) {
118                    job.onStartListingRecords();
119                }
120                ok = doListRecordsTask(job);
121                if (ok) {
122                    // notify end of prefix
123                    job.onFinishListingRecords();
124                    if (!job.removeCurrentPrefix()) {
125                        job.setState(State.DONE);
126                    }
127                } else if ((job.getResumptionToken() != null) && (delay == 0)) {
128                    /*
129                     * special case: request successful, but list was split due
130                     * to OAI flow control
131                     */
132                    ok = true;
133                }
134                break;
135            default:
136                throw new HarvesterException("internal error");
137            } // switch
138
139            if (ok) {
140                job.resetNetworkRetryCount();
141            } else {
142                if (delay > 0) {
143                    job.setNetworkRequestDelay(delay);
144                }
145                if (job.incNetworkRetryCount() >= maxNetworkTries) {
146                    throw new HarvesterException("too many network retries: " +
147                            job.getNetworkRetryCount());
148                }
149            }
150            return delay;
151        } catch (Throwable e) {
152            job.setState(State.ERROR);
153            if (e instanceof HarvesterException) {
154                throw (HarvesterException) e;
155            } else if (e instanceof XMLStreamException) {
156                throw new HarvesterException("malformed OAI response", e);
157            } else if (e instanceof IOException) {
158                throw new HarvesterException("network error", e);
159            } else {
160                throw new HarvesterException("unexpected exception", e);
161            }
162        }
163    }
164
165
166    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
167            XMLStreamException, HarvesterException {
168        Response response = null;
169        try {
170            uriBuilder.setVerb(VERB_IDENTIFY);
171            response = execute(job, false);
172            if (response == null) {
173                return false;
174            }
175
176            response.readStart(OAI_NS, "Identify", true);
177
178            String s = response.readContent(OAI_NS, "repositoryName", true);
179            job.setRepositoryName(s);
180
181            s = response.readContent(OAI_NS, "baseURL", true);
182            job.setBaseURL(s);
183
184            s = response.readContent(OAI_NS, "protocolVersion", true);
185            if (!PROTOCOL_VERSION.equals(s)) {
186                throw new HarvesterException(
187                        "unsupported protocol version: " + s);
188            }
189            job.setProtocolVersion(PROTOCOL_VERSION);
190
191            List<String> adminEmail = null;
192            do {
193                s = response.readContent(OAI_NS,
194                        "adminEmail", (adminEmail == null));
195                if (s != null) {
196                    if (adminEmail == null) {
197                        adminEmail = new ArrayList<String>();
198                    }
199                    adminEmail.add(s);
200                }
201            } while (s != null);
202            job.setAdminEmail(adminEmail);
203
204            /*
205             * defer setting of earliestDatestamp in job until we know the
206             * datestamp granularity of the repository
207             */
208            final String earliestDatestamp =
209                    response.readContent(OAI_NS, "earliestDatestamp", true);
210
211            s = response.readContent(OAI_NS, "deletedRecord", true);
212            if (DELETED_NOTION_NO.equals(s)) {
213                job.setDeletedNotion(DeletedNotion.NO);
214            } else if (DELETED_NOTION_PERSISTENT.equals(s)) {
215                job.setDeletedNotion(DeletedNotion.PERSISTENT);
216            } else if (DELETED_NOTION_TRANSIENT.equals(s)) {
217                job.setDeletedNotion(DeletedNotion.TRANSIENT);
218            } else {
219                throw new HarvesterException(
220                        "invalid value for 'deletedRecord': " + s);
221            }
222
223            s = response.readContent(OAI_NS, "granularity", true);
224            Granularity granularity = null;
225            if (GRANULARITY_DAYS.equals(s)) {
226                granularity = Granularity.DAYS;
227            } else if (GRANULARITY_SECONDS.equals(s)) {
228                granularity = Granularity.SECONDS;
229            } else {
230                throw new HarvesterException(
231                        "invalid value for 'granularity': " + s);
232            }
233            job.setGranularity(granularity);
234            job.setEarliestTimestamp(parseDate(granularity, earliestDatestamp));
235
236            int mask = 0;
237            do {
238                s = response.readContent(OAI_NS, "compression", false, false);
239                if (s != null) {
240                    if (s.isEmpty()) {
241                        logger.warn("skipping empty <compression> element " +
242                                "in Identify response");
243                    } else if (COMPRESSION_DEFLATE.equals(s)) {
244                        mask |= Repository.COMPRESSION_METHOD_DEFLATE;
245                    } else if (COMPRESSION_GZIP.equals(s)) {
246                        mask |= Repository.COMPRESSION_METHOD_GZIP;
247                    } else {
248                        logger.info("ignoring unsupported compression " +
249                                "method '{}'", s);
250                    }
251                }
252            } while (s != null);
253            job.setCompressionMask(mask);
254
255            List<Description> descriptions = null;
256            while (response.readStart(OAI_NS, "description", false)) {
257                response.consumeWhitespace();
258                final String namespaceURI = response.readNamespaceURI();
259                final String localName = response.peekElementLocalName();
260                DescriptionParser parser = harvester.findDescriptionParser(
261                        namespaceURI, localName);
262                if (parser != null) {
263                    Description desc = parser.parseDescription(
264                            response.getXMLStreamReader());
265                    if (desc != null) {
266                        if (descriptions == null) {
267                            descriptions = new ArrayList<Description>();
268                        }
269                        descriptions.add(desc);
270                    }
271                } else {
272                    logger.info("skipping <description> for {}", new QName(
273                            namespaceURI, localName));
274                }
275                response.readEnd(OAI_NS, "description", true);
276                job.setDescriptions(descriptions);
277            }
278            response.readEnd(OAI_NS, "Identify");
279            response.close();
280
281            job.onIdentify();
282
283            return true;
284        } finally {
285            if (response != null) {
286                response.release(job);
287            }
288        }
289    }
290
291
292    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
293            throws IOException, XMLStreamException, HarvesterException {
294        Response response = null;
295        try {
296            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
297            response = execute(job, true);
298            if (response == null) {
299                return false;
300            }
301
302            response.readStart(OAI_NS, "ListMetadataFormats", true);
303            List<MetadataFormat> metadataFormats =
304                    new ArrayList<MetadataFormat>();
305            while (response.readStart(OAI_NS, "metadataFormat",
306                    metadataFormats.isEmpty())) {
307                final String prefix =
308                        response.readContent(OAI_NS, "metadataPrefix", true);
309                final String schema =
310                        response.readContent(OAI_NS, "schema", true);
311                final String namespace =
312                        response.readContent(OAI_NS, "metadataNamespace", true);
313                metadataFormats.add(
314                        new MetadataFormatImpl(prefix, schema, namespace));
315                response.readEnd(OAI_NS, "metadataFormat");
316            } // while
317            response.readEnd(OAI_NS, "ListMetadataFormats");
318            response.close();
319
320            job.onListMetadataFormats(metadataFormats);
321
322            return true;
323        } finally {
324            if (response != null) {
325                response.release(job);
326            }
327        }
328    }
329
330
331    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
332            XMLStreamException, HarvesterException {
333        Response response = null;
334        try {
335            String token = job.getResumptionToken();
336            uriBuilder.setVerb(VERB_LIST_RECORDS);
337            if (token == null) {
338                uriBuilder.addParameter(ARG_METADATA_PREFIX,
339                        job.getCurrentPrefix());
340                if (job.getFrom() != null) {
341                    uriBuilder.addParameter("from",
342                            formatDate(job.getGranularity(), job.getFrom()));
343                }
344                if (job.getUntil() != null) {
345                    uriBuilder.addParameter("until",
346                            formatDate(job.getGranularity(), job.getUntil()));
347                }
348                if (job.getSet() != null) {
349                    uriBuilder.addParameter("set", job.getSet());
350                }
351            } else {
352                uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token);
353                job.incResumptionCount();
354            }
355
356            response = execute(job, true);
357            if (response == null) {
358                return false;
359            }
360
361            response.readStart(OAI_NS, "ListRecords", true);
362            header.reset();
363            boolean first = true;
364            while (response.readStart(OAI_NS, "record", first)) {
365                boolean deleted = false;
366
367                response.readStart(OAI_NS, "header", true, true);
368                String s = response.readAttributeValue(null, "status");
369                if (s != null) {
370                    if ("deleted".equals(s)) {
371                        header.setDeleted(true);
372                        deleted = true;
373                    } else {
374                        throw new XMLStreamException("attribute 'status' of "
375                                + "element 'record' must contain 'deleted'",
376                                response.getLocation());
377                    }
378                }
379                response.consumeStart();
380                s = response.readContent(OAI_NS, "identifier", true);
381                header.setIdentifier(s);
382                s = response.readContent(OAI_NS, "datestamp", true);
383                header.setDatestamp(parseDate(job.getGranularity(), s));
384                while (response.readStart(OAI_NS, "setSpec", false)) {
385                    header.addSetSpec(response.readString(true));
386                    response.readEnd(OAI_NS, "setSpec", true);
387                }
388                response.readEnd(OAI_NS, "header");
389                job.onRecord(header);
390                /*
391                 * only parse metadata from non-deleted records
392                 */
393                if (!deleted) {
394                    if (response.readStart(OAI_NS, "metadata", true)) {
395                        response.consumeWhitespace();
396                        if (!response.getXMLStreamReader().isCharacters()) {
397                            readerWrapper.reset(response.getXMLStreamReader());
398                            job.onRecordMetadata(header, readerWrapper);
399                        } else {
400                            // FIXME: notify handler somehow of failed record
401                            logger.error("invalid metadata record with identifier '{}' contains only characters",
402                                    header.getIdentifier());
403                        }
404                        response.consumeWhitespace();
405                        response.readEnd(OAI_NS, "metadata", true);
406                    }
407                }
408                while (response.readStart(OAI_NS, "about", false)) {
409                    response.consumeWhitespace();
410                    job.onRecordAbout(header, null);
411                    response.consumeWhitespace();
412                    response.readEnd(OAI_NS, "about", true);
413                } // while (about)
414                response.readEnd(OAI_NS, "record");
415                first = false;
416                header.reset();
417            } // while (record)
418
419            /*
420             * expect resumptionToken (possibly empty) when request was resumed
421             */
422            String nextToken = null;
423            boolean got_token = false;
424            if (response.readStart(OAI_NS, "resumptionToken", false, true)) {
425                // XXX: read attributes here!
426                response.consumeStart();
427
428                nextToken = response.readString(false);
429                if (nextToken != null) {
430                    if (!nextToken.isEmpty()) {
431                        job.setResumptionToken(nextToken);
432                    } else {
433                        nextToken = null;
434                    }
435                }
436                response.readEnd(OAI_NS, "resumptionToken");
437                got_token = true;
438            }
439            if ((token != null) && (nextToken == null) && !got_token) {
440                logger.error("An <resumptionToken> element was expected, " +
441                        "but none was found; assuming a complete result list.");
442                logger.error("An OAI provider is required to serialize an " +
443                        "empty <resumptionToken> element to announce that " +
444                        "the result list is complete.");
445            }
446            response.readEnd(OAI_NS, "ListRecords");
447
448            response.close();
449
450            return (nextToken == null);
451        } catch (HarvesterProtocolErrorException e) {
452            /*
453             * XXX: handle NO_RECORDS_MATCH protocol error
454             */
455            List<ProtocolError> errors = e.getErrors();
456            if ((errors != null) && !errors.isEmpty()) {
457                for (ProtocolError error : errors) {
458                    if (error.isCode(ProtocolError.Code.NO_RECORDS_MATCH)) {
459                        logger.debug("no records match prefix '{}'",
460                                job.getCurrentPrefix());
461                        return true;
462                    }
463                }
464            }
465            throw e;
466        } finally {
467            if (response != null) {
468                response.release(job);
469            }
470        }
471    }
472
473
474    private Response execute(HarvestJobImpl job, boolean compress)
475            throws IOException, XMLStreamException, HarvesterException {
476        final long now = System.currentTimeMillis();
477        job.incRequestCount();
478        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
479        logger.debug("Performing HTTP GET request to {}", uri);
480        final HttpGet request = new HttpGet(uri);
481        if (compress) {
482            final int mask = job.getCompressionMask();
483            if ((mask & Repository.COMPRESSION_METHOD_GZIP) > 0) {
484                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
485            }
486            if ((mask & Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
487                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
488            }
489            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
490        }
491        request.addHeader(HEADER_CONNECTION, "close");
492        // FIXME: get timeout values from somewhere?
493        HttpParams params = request.getParams();
494        params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
495        params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
496
497        final HttpResponse response = client.execute(request);
498        /*
499         * FIXME: the following code need serious re-factoring
500         */
501        boolean close = false;
502        try {
503            job.addToNetworkTime(System.currentTimeMillis() - now);
504            final int status = response.getStatusLine().getStatusCode();
505            if (status == HttpStatus.SC_OK) {
506                delay = 0;
507                return new Response(job, response, harvester);
508            } else {
509                if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
510                    delay = parseTryAfter(response);
511                    logger.debug("got service unavailable status, retrying " +
512                            "after {} seconds", delay);
513                    close = true;
514                    return null;
515                } else {
516                    throw new HarvesterException(
517                            "provider returned unexpected HTTP status: " +
518                                    status);
519                }
520            }
521        } catch (IOException e) {
522            close = true;
523            throw e;
524        } catch (XMLStreamException e) {
525            close = true;
526            throw e;
527        } catch (HarvesterException e) {
528            close = true;
529            throw e;
530        } finally {
531            if (close) {
532                /*
533                 * try hard to release HTTP client resources ...
534                 */
535                try {
536                    EntityUtils.consume(response.getEntity());
537                } catch (IOException ex) {
538                    /* IGNORE */
539                }
540
541                /* make sure to release allocated resources */
542                HttpClientUtils.closeQuietly(response);
543
544                if (request != null) {
545                    request.abort();
546                }
547            }
548        }
549    }
550
551
552    private long parseTryAfter(HttpResponse response) {
553        long delay = -1;
554        final Header s = response.getFirstHeader("Retry-After");
555        if (s != null) {
556            final String v = s.getValue();
557            try {
558                /*
559                 * if plain number, it is the time in seconds
560                 */
561                delay = Integer.parseInt(v) * 1000;
562            } catch (NumberFormatException e) {
563                try {
564                    Date date = DateUtils.parseDate(v);
565                    delay = date.getTime() - System.currentTimeMillis();
566                } catch (DateParseException f) {
567                    /* IGNORE */
568                }
569            }
570        }
571        return (delay > 0) ? delay : 0;
572    }
573
574
575    private static Date parseDate(Repository.Granularity granularity, String s)
576            throws HarvesterException {
577        final MutableDateTime date = new MutableDateTime(0);
578        for (int i = 0; i < DATEFORMATS.length; i++) {
579            DateTimeFormatter fmt = DateTimeFormat
580                    .forPattern(DATEFORMATS[i])
581                    .withZone(DateTimeZone.UTC);
582            if (fmt.parseInto(date, s, 0) == s.length()) {
583                if (DATEFORMATS[i].equals(DATEFORMAT_SECONDS) &&
584                        (granularity == Repository.Granularity.DAYS)) {
585                    logger.warn("repository announced DAYS granularity but " +
586                            "provided timestamp with SECONDS granularity");
587                }
588                if (DATEFORMATS[i].equals(DATEFORMAT_DAYS)) {
589                    date.setTime(0, 0, 0, 0);
590                }
591                return date.toDate();
592            }
593        }
594        throw new HarvesterException("invalid date: " + s);
595    }
596
597
598    private static String formatDate(Repository.Granularity granularity,
599            Date date) throws HarvesterException {
600        DateTimeFormatter fmt = null;
601        switch (granularity) {
602        case DAYS:
603            fmt = DateTimeFormat.forPattern(DATEFORMAT_DAYS)
604                .withZone(DateTimeZone.UTC);
605            break;
606        case SECONDS:
607            fmt = DateTimeFormat.forPattern(DATEFORMAT_SECONDS)
608                .withZone(DateTimeZone.UTC);
609            break;
610        default:
611            /* cannot happen, but silence FindBugs warning */
612            throw new HarvesterException("invalid granularity: " + granularity);
613        }
614        return fmt.print(new DateTime(date));
615    }
616
617} // class HarvestWorker
Note: See TracBrowser for help on using the repository browser.