source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java @ 3011

Last change on this file since 3011 was 3011, checked in by oschonef, 11 years ago
  • update Woodstox, SLF4J and Apache Commons HTTP client dependencies
  • several interoperability fixes (including relaxed behavior)
  • several bug fixes
  • Property svn:eol-style set to native
File size: 23.1 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8
9import javax.xml.namespace.QName;
10import javax.xml.stream.XMLStreamException;
11
12import org.apache.http.Header;
13import org.apache.http.HttpResponse;
14import org.apache.http.HttpStatus;
15import org.apache.http.client.HttpClient;
16import org.apache.http.client.methods.HttpGet;
17import org.apache.http.client.utils.HttpClientUtils;
18import org.apache.http.impl.client.DefaultHttpClient;
19import org.apache.http.impl.cookie.DateParseException;
20import org.apache.http.impl.cookie.DateUtils;
21import org.apache.http.params.CoreConnectionPNames;
22import org.apache.http.params.CoreProtocolPNames;
23import org.apache.http.params.HttpParams;
24import org.apache.http.util.EntityUtils;
25import org.joda.time.DateTime;
26import org.joda.time.DateTimeZone;
27import org.joda.time.MutableDateTime;
28import org.joda.time.format.DateTimeFormat;
29import org.joda.time.format.DateTimeFormatter;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import eu.clarin.cmdi.oai.harvester.Description;
34import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
35import eu.clarin.cmdi.oai.harvester.HarvesterException;
36import eu.clarin.cmdi.oai.harvester.HarvesterProtocolErrorException;
37import eu.clarin.cmdi.oai.harvester.MetadataFormat;
38import eu.clarin.cmdi.oai.harvester.ProtocolError;
39import eu.clarin.cmdi.oai.harvester.Repository;
40import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
41import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
42import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser;
43import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
44
45
46public class HarvestWorker {
47    private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/";
48    private static final String ARG_METADATA_PREFIX = "metadataPrefix";
49    private static final String ARG_RESUMPTION_TOKEN = "resumptionToken";
50    private static final String PROTOCOL_VERSION = "2.0";
51    private static final String DELETED_NOTION_NO = "no";
52    private static final String DELETED_NOTION_PERSISTENT = "persistent";
53    private static final String DELETED_NOTION_TRANSIENT = "transient";
54    private static final String GRANULARITY_DAYS = "YYYY-MM-DD";
55    private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ";
56    private static final String COMPRESSION_DEFLATE = "deflate";
57    private static final String COMPRESSION_GZIP = "gzip";
58    private static final String VERB_IDENTIFY = "Identify";
59    private static final String VERB_LIST_METADATAFORMATS = "ListMetadataFormats";
60    private static final String VERB_LIST_RECORDS = "ListRecords";
61    private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
62    private static final String HEADER_CONNECTION = "Connection";
63    private static final String DATEFORMAT_SECONDS = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'";
64    private static final String DATEFORMAT_DAYS = "yyyy'-'MM'-'dd";
65    private static final String[] DATEFORMATS =
66        { DATEFORMAT_SECONDS, DATEFORMAT_DAYS };
67    private static final Logger logger =
68            LoggerFactory.getLogger(HarvestWorker.class);
69    private final AbstractHarvester harvester;
70    private final HttpClient client;
71    private final RequestUriBuilder uriBuilder;
72    private final HeaderImpl header;
73    private final XmlStreamReaderProxy readerWrapper;
74    private long delay;
75
76
77    HarvestWorker(AbstractHarvester harvester) {
78        this.harvester = harvester;
79        // http client
80        client = new DefaultHttpClient();
81        client.getParams().setParameter(CoreProtocolPNames.USER_AGENT,
82                "eu.clarin.cmdi.oai.Harvester/0.0.1");
83        // request stuff
84        uriBuilder = new RequestUriBuilder();
85        // other stuff
86        this.header        = new HeaderImpl();
87        this.readerWrapper = new XmlStreamReaderProxy();
88    }
89
90
91    long process(HarvestJobImpl job, int maxNetworkTries)
92            throws HarvesterException {
93        if (job.isState(State.CANCELED)) {
94            return -1;
95        }
96        if (!job.isState(State.RUNNING)) {
97            throw new HarvesterException("job is not in running state");
98        }
99
100        boolean ok;
101        try {
102            switch (job.getTask()) {
103            case IDENTIFY_REPOSITORY:
104                ok = doIdentifyTask(job);
105                if (ok) {
106                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
107                }
108                break;
109            case ENUMERATE_METADATA_FORMATS:
110                ok = doListMetadataFormatsTask(job);
111                if (ok) {
112                    job.setTask(Task.HARVEST_RECORDS);
113                }
114                break;
115            case HARVEST_RECORDS:
116                // check for new prefix and notify start, if needed
117                if (job.isHarvestingNewPrefix()) {
118                    job.onStartListingRecords();
119                }
120                ok = doListRecordsTask(job);
121                if (ok) {
122                    // notify end of prefix
123                    job.onFinishListingRecords();
124                    if (!job.removeCurrentPrefix()) {
125                        job.setState(State.DONE);
126                    }
127                } else if ((job.getResumptionToken() != null) && (delay == 0)) {
128                    /*
129                     * special case: request successful, but list was split due
130                     * to OAI flow control
131                     */
132                    ok = true;
133                }
134                break;
135            default:
136                throw new HarvesterException("internal error");
137            } // switch
138
139            if (ok) {
140                job.resetNetworkRetryCount();
141            } else {
142                if (delay > 0) {
143                    job.setNetworkRequestDelay(delay);
144                }
145                if (job.incNetworkRetryCount() >= maxNetworkTries) {
146                    throw new HarvesterException("too many network retries: " +
147                            job.getNetworkRetryCount());
148                }
149            }
150            return delay;
151        } catch (Throwable e) {
152            job.setState(State.ERROR);
153            if (e instanceof HarvesterException) {
154                throw (HarvesterException) e;
155            } else if (e instanceof XMLStreamException) {
156                throw new HarvesterException("malformed OAI response", e);
157            } else if (e instanceof IOException) {
158                throw new HarvesterException("network error", e);
159            } else {
160                throw new HarvesterException("unexpected exception", e);
161            }
162        }
163    }
164
165
166    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
167            XMLStreamException, HarvesterException {
168        Response response = null;
169        try {
170            uriBuilder.setVerb(VERB_IDENTIFY);
171            response = execute(job, false);
172            if (response == null) {
173                return false;
174            }
175
176            response.readStart(OAI_NS, "Identify", true);
177
178            String s = response.readContent(OAI_NS, "repositoryName", true);
179            job.setRepositoryName(s);
180
181            s = response.readContent(OAI_NS, "baseURL", true);
182            job.setBaseURL(s);
183
184            s = response.readContent(OAI_NS, "protocolVersion", true);
185            if (!PROTOCOL_VERSION.equals(s)) {
186                throw new HarvesterException(
187                        "unsupported protocol version: " + s);
188            }
189            job.setProtocolVersion(PROTOCOL_VERSION);
190
191            List<String> adminEmail = null;
192            do {
193                s = response.readContent(OAI_NS,
194                        "adminEmail", (adminEmail == null));
195                if (s != null) {
196                    if (adminEmail == null) {
197                        adminEmail = new ArrayList<String>();
198                    }
199                    adminEmail.add(s);
200                }
201            } while (s != null);
202            job.setAdminEmail(adminEmail);
203
204            /*
205             * defer setting of earliestDatestamp in job until we know the
206             * datestamp granularity of the repository
207             */
208            final String earliestDatestamp =
209                    response.readContent(OAI_NS, "earliestDatestamp", true);
210
211            s = response.readContent(OAI_NS, "deletedRecord", true);
212            if (DELETED_NOTION_NO.equals(s)) {
213                job.setDeletedNotion(DeletedNotion.NO);
214            } else if (DELETED_NOTION_PERSISTENT.equals(s)) {
215                job.setDeletedNotion(DeletedNotion.PERSISTENT);
216            } else if (DELETED_NOTION_TRANSIENT.equals(s)) {
217                job.setDeletedNotion(DeletedNotion.TRANSIENT);
218            } else {
219                throw new HarvesterException(
220                        "invalid value for 'deletedRecord': " + s);
221            }
222
223            s = response.readContent(OAI_NS, "granularity", true);
224            Granularity granularity = null;
225            if (GRANULARITY_DAYS.equals(s)) {
226                granularity = Granularity.DAYS;
227            } else if (GRANULARITY_SECONDS.equals(s)) {
228                granularity = Granularity.SECONDS;
229            } else {
230                throw new HarvesterException(
231                        "invalid value for 'granularity': " + s);
232            }
233            job.setGranularity(granularity);
234            job.setEarliestTimestamp(parseDate(granularity, earliestDatestamp));
235
236            int mask = 0;
237            do {
238                s = response.readContent(OAI_NS, "compression", false);
239                if (s != null) {
240                    if (COMPRESSION_DEFLATE.equals(s)) {
241                        mask |= Repository.COMPRESSION_METHOD_DEFLATE;
242                    } else if (COMPRESSION_GZIP.equals(s)) {
243                        mask |= Repository.COMPRESSION_METHOD_GZIP;
244                    }
245                }
246            } while (s != null);
247            job.setCompressionMask(mask);
248
249            List<Description> descriptions = null;
250            while (response.readStart(OAI_NS, "description", false)) {
251                response.consumeWhitespace();
252                final String namespaceURI = response.readNamespaceURI();
253                final String localName = response.peekElementLocalName();
254                DescriptionParser parser = harvester.findDescriptionParser(
255                        namespaceURI, localName);
256                if (parser != null) {
257                    Description desc = parser.parseDescription(
258                            response.getXMLStreamReader());
259                    if (desc != null) {
260                        if (descriptions == null) {
261                            descriptions = new ArrayList<Description>();
262                        }
263                        descriptions.add(desc);
264                    }
265                } else {
266                    logger.info("skipping <description> for {}", new QName(
267                            namespaceURI, localName));
268                }
269                response.readEnd(OAI_NS, "description", true);
270                job.setDescriptions(descriptions);
271            }
272            response.readEnd(OAI_NS, "Identify");
273            response.close();
274
275            job.onIdentify();
276
277            return true;
278        } finally {
279            if (response != null) {
280                response.release(job);
281            }
282        }
283    }
284
285
286    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
287            throws IOException, XMLStreamException, HarvesterException {
288        Response response = null;
289        try {
290            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
291            response = execute(job, true);
292            if (response == null) {
293                return false;
294            }
295
296            response.readStart(OAI_NS, "ListMetadataFormats", true);
297            List<MetadataFormat> metadataFormats =
298                    new ArrayList<MetadataFormat>();
299            while (response.readStart(OAI_NS, "metadataFormat",
300                    metadataFormats.isEmpty())) {
301                final String prefix =
302                        response.readContent(OAI_NS, "metadataPrefix", true);
303                final String schema =
304                        response.readContent(OAI_NS, "schema", true);
305                final String namespace =
306                        response.readContent(OAI_NS, "metadataNamespace", true);
307                metadataFormats.add(
308                        new MetadataFormatImpl(prefix, schema, namespace));
309                response.readEnd(OAI_NS, "metadataFormat");
310            } // while
311            response.readEnd(OAI_NS, "ListMetadataFormats");
312            response.close();
313
314            job.onListMetadataFormats(metadataFormats);
315
316            return true;
317        } finally {
318            if (response != null) {
319                response.release(job);
320            }
321        }
322    }
323
324
325    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
326            XMLStreamException, HarvesterException {
327        Response response = null;
328        try {
329            String token = job.getResumptionToken();
330            uriBuilder.setVerb(VERB_LIST_RECORDS);
331            if (token == null) {
332                uriBuilder.addParameter(ARG_METADATA_PREFIX,
333                        job.getCurrentPrefix());
334                if (job.getFrom() != null) {
335                    uriBuilder.addParameter("from",
336                            formatDate(job.getGranularity(), job.getFrom()));
337                }
338                if (job.getUntil() != null) {
339                    uriBuilder.addParameter("until",
340                            formatDate(job.getGranularity(), job.getUntil()));
341                }
342                if (job.getSet() != null) {
343                    uriBuilder.addParameter("set", job.getSet());
344                }
345            } else {
346                uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token);
347                job.incResumptionCount();
348            }
349
350            response = execute(job, true);
351            if (response == null) {
352                return false;
353            }
354
355            String nextToken = null;
356
357            response.readStart(OAI_NS, "ListRecords", true);
358            header.reset();
359            boolean first = true;
360            while (response.readStart(OAI_NS, "record", first)) {
361                boolean deleted = false;
362
363                response.readStart(OAI_NS, "header", true, true);
364                String s = response.readAttributeValue(null, "status");
365                if (s != null) {
366                    if ("deleted".equals(s)) {
367                        header.setDeleted(true);
368                        deleted = true;
369                    } else {
370                        throw new XMLStreamException("attribute 'status' of "
371                                + "element 'record' must contain 'deleted'",
372                                response.getLocation());
373                    }
374                }
375                response.consumeStart();
376                s = response.readContent(OAI_NS, "identifier", true);
377                header.setIdentifier(s);
378                s = response.readContent(OAI_NS, "datestamp", true);
379                header.setDatestamp(parseDate(job.getGranularity(), s));
380                while (response.readStart(OAI_NS, "setSpec", false)) {
381                    header.addSetSpec(response.readString(true));
382                    response.readEnd(OAI_NS, "setSpec", true);
383                }
384                response.readEnd(OAI_NS, "header");
385                job.onRecord(header);
386                /*
387                 * only parse metadata from non-deleted records
388                 */
389                if (!deleted) {
390                    if (response.readStart(OAI_NS, "metadata", true)) {
391                        response.consumeWhitespace();
392                        readerWrapper.reset(response.getXMLStreamReader());
393                        job.onRecordMetadata(header, readerWrapper);
394                        response.consumeWhitespace();
395                        response.readEnd(OAI_NS, "metadata");
396                    }
397                }
398                while (response.readStart(OAI_NS, "about", false)) {
399                    response.consumeWhitespace();
400                    job.onRecordAbout(header, null);
401                    response.consumeWhitespace();
402                    response.readEnd(OAI_NS, "about", true);
403                } // while (about)
404                response.readEnd(OAI_NS, "record");
405                first = false;
406                header.reset();
407            } // while (record)
408
409            /*
410             * expect resumptionToken (possibly empty) when request was resumed
411             */
412            if (response.readStart(OAI_NS, "resumptionToken",
413                    (token != null), true)) {
414                // XXX: read attributes here!
415                response.consumeStart();
416
417                nextToken = response.readString(false);
418                if (nextToken != null) {
419                    job.setResumptionToken(nextToken);
420                }
421                response.readEnd(OAI_NS, "resumptionToken");
422            }
423            response.readEnd(OAI_NS, "ListRecords");
424
425            response.close();
426
427            return (nextToken == null);
428        } catch (HarvesterProtocolErrorException e) {
429            /*
430             * XXX: handle NO_RECORDS_MATCH protocol error
431             */
432            List<ProtocolError> errors = e.getErrors();
433            if ((errors != null) && !errors.isEmpty()) {
434                for (ProtocolError error : errors) {
435                    if (error.isCode(ProtocolError.Code.NO_RECORDS_MATCH)) {
436                        logger.debug("no records match prefix '{}'",
437                                job.getCurrentPrefix());
438                        return true;
439                    }
440                }
441            }
442            throw e;
443        } finally {
444            if (response != null) {
445                response.release(job);
446            }
447        }
448    }
449
450
451    private Response execute(HarvestJobImpl job, boolean compress)
452            throws IOException, XMLStreamException, HarvesterException {
453        final long now = System.currentTimeMillis();
454        job.incRequestCount();
455        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
456        logger.debug("Performing HTTP GET request to {}", uri);
457        final HttpGet request = new HttpGet(uri);
458        if (compress) {
459            final int mask = job.getCompressionMask();
460            if ((mask & Repository.COMPRESSION_METHOD_GZIP) > 0) {
461                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
462            }
463            if ((mask & Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
464                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
465            }
466            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
467        }
468        request.addHeader(HEADER_CONNECTION, "close");
469        // FIXME: get timeout values from somewhere?
470        HttpParams params = request.getParams();
471        params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
472        params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
473
474        final HttpResponse response = client.execute(request);
475        /*
476         * FIXME: the following code need serious re-factoring
477         */
478        boolean close = false;
479        try {
480            job.addToNetworkTime(System.currentTimeMillis() - now);
481            final int status = response.getStatusLine().getStatusCode();
482            if (status == HttpStatus.SC_OK) {
483                delay = 0;
484                return new Response(job, response, harvester);
485            } else {
486                if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
487                    delay = parseTryAfter(response);
488                    logger.debug("got service unavailable status, retrying " +
489                            "after {} seconds", delay);
490                    close = true;
491                    return null;
492                } else {
493                    throw new HarvesterException(
494                            "provider returned unexpected HTTP status: " +
495                                    status);
496                }
497            }
498        } catch (IOException e) {
499            close = true;
500            throw e;
501        } catch (XMLStreamException e) {
502            close = true;
503            throw e;
504        } catch (HarvesterException e) {
505            close = true;
506            throw e;
507        } finally {
508            if (close) {
509                /*
510                 * try hard to release HTTP client resources ...
511                 */
512                try {
513                    EntityUtils.consume(response.getEntity());
514                } catch (IOException ex) {
515                    /* IGNORE */
516                }
517
518                /* make sure to release allocated resources */
519                HttpClientUtils.closeQuietly(response);
520
521                if (request != null) {
522                    request.abort();
523                }
524            }
525        }
526    }
527
528
529    private long parseTryAfter(HttpResponse response) {
530        long delay = -1;
531        final Header s = response.getFirstHeader("Retry-After");
532        if (s != null) {
533            final String v = s.getValue();
534            try {
535                /*
536                 * if plain number, it is the time in seconds
537                 */
538                delay = Integer.parseInt(v) * 1000;
539            } catch (NumberFormatException e) {
540                try {
541                    Date date = DateUtils.parseDate(v);
542                    delay = date.getTime() - System.currentTimeMillis();
543                } catch (DateParseException f) {
544                    /* IGNORE */
545                }
546            }
547        }
548        return (delay > 0) ? delay : 0;
549    }
550
551
552    private static Date parseDate(Repository.Granularity granularity, String s)
553            throws HarvesterException {
554        final MutableDateTime date = new MutableDateTime(0);
555        for (int i = 0; i < DATEFORMATS.length; i++) {
556            DateTimeFormatter fmt = DateTimeFormat
557                    .forPattern(DATEFORMATS[i])
558                    .withZone(DateTimeZone.UTC);
559            if (fmt.parseInto(date, s, 0) == s.length()) {
560                if (DATEFORMATS[i].equals(DATEFORMAT_SECONDS) &&
561                        (granularity == Repository.Granularity.DAYS)) {
562                    logger.warn("repository announced DAYS granularity but " +
563                            "provided timestamp with SECONDS granularity");
564                }
565                if (DATEFORMATS[i].equals(DATEFORMAT_DAYS)) {
566                    date.setTime(0, 0, 0, 0);
567                }
568                return date.toDate();
569            }
570        }
571        throw new HarvesterException("invalid date: " + s);
572    }
573
574
575    private static String formatDate(Repository.Granularity granularity,
576            Date date) throws HarvesterException {
577        DateTimeFormatter fmt = null;
578        switch (granularity) {
579        case DAYS:
580            fmt = DateTimeFormat.forPattern(DATEFORMAT_DAYS)
581                .withZone(DateTimeZone.UTC);
582            break;
583        case SECONDS:
584            fmt = DateTimeFormat.forPattern(DATEFORMAT_SECONDS)
585                .withZone(DateTimeZone.UTC);
586            break;
587        default:
588            /* cannot happen, but silence FindBugs warning */
589            throw new HarvesterException("invalid granularity: " + granularity);
590        }
591        return fmt.print(new DateTime(date));
592    }
593
594} // class HarvestWorker
Note: See TracBrowser for help on using the repository browser.