source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java @ 3014

Last change on this file since 3014 was 3014, checked in by oschonef, 11 years ago
  • just skip empty <compression> elements or unsupported compression methods within Identify response
  • Property svn:eol-style set to native
File size: 23.4 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8
9import javax.xml.namespace.QName;
10import javax.xml.stream.XMLStreamException;
11
12import org.apache.http.Header;
13import org.apache.http.HttpResponse;
14import org.apache.http.HttpStatus;
15import org.apache.http.client.HttpClient;
16import org.apache.http.client.methods.HttpGet;
17import org.apache.http.client.utils.HttpClientUtils;
18import org.apache.http.impl.client.DefaultHttpClient;
19import org.apache.http.impl.cookie.DateParseException;
20import org.apache.http.impl.cookie.DateUtils;
21import org.apache.http.params.CoreConnectionPNames;
22import org.apache.http.params.CoreProtocolPNames;
23import org.apache.http.params.HttpParams;
24import org.apache.http.util.EntityUtils;
25import org.joda.time.DateTime;
26import org.joda.time.DateTimeZone;
27import org.joda.time.MutableDateTime;
28import org.joda.time.format.DateTimeFormat;
29import org.joda.time.format.DateTimeFormatter;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33import eu.clarin.cmdi.oai.harvester.Description;
34import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
35import eu.clarin.cmdi.oai.harvester.HarvesterException;
36import eu.clarin.cmdi.oai.harvester.HarvesterProtocolErrorException;
37import eu.clarin.cmdi.oai.harvester.MetadataFormat;
38import eu.clarin.cmdi.oai.harvester.ProtocolError;
39import eu.clarin.cmdi.oai.harvester.Repository;
40import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
41import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
42import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser;
43import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
44
45
46public class HarvestWorker {
47    private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/";
48    private static final String ARG_METADATA_PREFIX = "metadataPrefix";
49    private static final String ARG_RESUMPTION_TOKEN = "resumptionToken";
50    private static final String PROTOCOL_VERSION = "2.0";
51    private static final String DELETED_NOTION_NO = "no";
52    private static final String DELETED_NOTION_PERSISTENT = "persistent";
53    private static final String DELETED_NOTION_TRANSIENT = "transient";
54    private static final String GRANULARITY_DAYS = "YYYY-MM-DD";
55    private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ";
56    private static final String COMPRESSION_DEFLATE = "deflate";
57    private static final String COMPRESSION_GZIP = "gzip";
58    private static final String VERB_IDENTIFY = "Identify";
59    private static final String VERB_LIST_METADATAFORMATS = "ListMetadataFormats";
60    private static final String VERB_LIST_RECORDS = "ListRecords";
61    private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
62    private static final String HEADER_CONNECTION = "Connection";
63    private static final String DATEFORMAT_SECONDS = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'";
64    private static final String DATEFORMAT_DAYS = "yyyy'-'MM'-'dd";
65    private static final String[] DATEFORMATS =
66        { DATEFORMAT_SECONDS, DATEFORMAT_DAYS };
67    private static final Logger logger =
68            LoggerFactory.getLogger(HarvestWorker.class);
69    private final AbstractHarvester harvester;
70    private final HttpClient client;
71    private final RequestUriBuilder uriBuilder;
72    private final HeaderImpl header;
73    private final XmlStreamReaderProxy readerWrapper;
74    private long delay;
75
76
77    HarvestWorker(AbstractHarvester harvester) {
78        this.harvester = harvester;
79        // http client
80        client = new DefaultHttpClient();
81        client.getParams().setParameter(CoreProtocolPNames.USER_AGENT,
82                "eu.clarin.cmdi.oai.Harvester/0.0.1");
83        // request stuff
84        uriBuilder = new RequestUriBuilder();
85        // other stuff
86        this.header        = new HeaderImpl();
87        this.readerWrapper = new XmlStreamReaderProxy();
88    }
89
90
91    long process(HarvestJobImpl job, int maxNetworkTries)
92            throws HarvesterException {
93        if (job.isState(State.CANCELED)) {
94            return -1;
95        }
96        if (!job.isState(State.RUNNING)) {
97            throw new HarvesterException("job is not in running state");
98        }
99
100        boolean ok;
101        try {
102            switch (job.getTask()) {
103            case IDENTIFY_REPOSITORY:
104                ok = doIdentifyTask(job);
105                if (ok) {
106                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
107                }
108                break;
109            case ENUMERATE_METADATA_FORMATS:
110                ok = doListMetadataFormatsTask(job);
111                if (ok) {
112                    job.setTask(Task.HARVEST_RECORDS);
113                }
114                break;
115            case HARVEST_RECORDS:
116                // check for new prefix and notify start, if needed
117                if (job.isHarvestingNewPrefix()) {
118                    job.onStartListingRecords();
119                }
120                ok = doListRecordsTask(job);
121                if (ok) {
122                    // notify end of prefix
123                    job.onFinishListingRecords();
124                    if (!job.removeCurrentPrefix()) {
125                        job.setState(State.DONE);
126                    }
127                } else if ((job.getResumptionToken() != null) && (delay == 0)) {
128                    /*
129                     * special case: request successful, but list was split due
130                     * to OAI flow control
131                     */
132                    ok = true;
133                }
134                break;
135            default:
136                throw new HarvesterException("internal error");
137            } // switch
138
139            if (ok) {
140                job.resetNetworkRetryCount();
141            } else {
142                if (delay > 0) {
143                    job.setNetworkRequestDelay(delay);
144                }
145                if (job.incNetworkRetryCount() >= maxNetworkTries) {
146                    throw new HarvesterException("too many network retries: " +
147                            job.getNetworkRetryCount());
148                }
149            }
150            return delay;
151        } catch (Throwable e) {
152            job.setState(State.ERROR);
153            if (e instanceof HarvesterException) {
154                throw (HarvesterException) e;
155            } else if (e instanceof XMLStreamException) {
156                throw new HarvesterException("malformed OAI response", e);
157            } else if (e instanceof IOException) {
158                throw new HarvesterException("network error", e);
159            } else {
160                throw new HarvesterException("unexpected exception", e);
161            }
162        }
163    }
164
165
166    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
167            XMLStreamException, HarvesterException {
168        Response response = null;
169        try {
170            uriBuilder.setVerb(VERB_IDENTIFY);
171            response = execute(job, false);
172            if (response == null) {
173                return false;
174            }
175
176            response.readStart(OAI_NS, "Identify", true);
177
178            String s = response.readContent(OAI_NS, "repositoryName", true);
179            job.setRepositoryName(s);
180
181            s = response.readContent(OAI_NS, "baseURL", true);
182            job.setBaseURL(s);
183
184            s = response.readContent(OAI_NS, "protocolVersion", true);
185            if (!PROTOCOL_VERSION.equals(s)) {
186                throw new HarvesterException(
187                        "unsupported protocol version: " + s);
188            }
189            job.setProtocolVersion(PROTOCOL_VERSION);
190
191            List<String> adminEmail = null;
192            do {
193                s = response.readContent(OAI_NS,
194                        "adminEmail", (adminEmail == null));
195                if (s != null) {
196                    if (adminEmail == null) {
197                        adminEmail = new ArrayList<String>();
198                    }
199                    adminEmail.add(s);
200                }
201            } while (s != null);
202            job.setAdminEmail(adminEmail);
203
204            /*
205             * defer setting of earliestDatestamp in job until we know the
206             * datestamp granularity of the repository
207             */
208            final String earliestDatestamp =
209                    response.readContent(OAI_NS, "earliestDatestamp", true);
210
211            s = response.readContent(OAI_NS, "deletedRecord", true);
212            if (DELETED_NOTION_NO.equals(s)) {
213                job.setDeletedNotion(DeletedNotion.NO);
214            } else if (DELETED_NOTION_PERSISTENT.equals(s)) {
215                job.setDeletedNotion(DeletedNotion.PERSISTENT);
216            } else if (DELETED_NOTION_TRANSIENT.equals(s)) {
217                job.setDeletedNotion(DeletedNotion.TRANSIENT);
218            } else {
219                throw new HarvesterException(
220                        "invalid value for 'deletedRecord': " + s);
221            }
222
223            s = response.readContent(OAI_NS, "granularity", true);
224            Granularity granularity = null;
225            if (GRANULARITY_DAYS.equals(s)) {
226                granularity = Granularity.DAYS;
227            } else if (GRANULARITY_SECONDS.equals(s)) {
228                granularity = Granularity.SECONDS;
229            } else {
230                throw new HarvesterException(
231                        "invalid value for 'granularity': " + s);
232            }
233            job.setGranularity(granularity);
234            job.setEarliestTimestamp(parseDate(granularity, earliestDatestamp));
235
236            int mask = 0;
237            do {
238                s = response.readContent(OAI_NS, "compression", false, false);
239                if (s != null) {
240                    if (COMPRESSION_DEFLATE.equals(s)) {
241                        mask |= Repository.COMPRESSION_METHOD_DEFLATE;
242                    } else if (COMPRESSION_GZIP.equals(s)) {
243                        mask |= Repository.COMPRESSION_METHOD_GZIP;
244                    } else {
245                        logger.info("ignoring unsupported compression " +
246                                "method '{}'", s);
247                    }
248                } else {
249                    logger.warn("skipping empty <compression> element " +
250                            "in Identify response");
251                }
252            } while (s != null);
253            job.setCompressionMask(mask);
254
255            List<Description> descriptions = null;
256            while (response.readStart(OAI_NS, "description", false)) {
257                response.consumeWhitespace();
258                final String namespaceURI = response.readNamespaceURI();
259                final String localName = response.peekElementLocalName();
260                DescriptionParser parser = harvester.findDescriptionParser(
261                        namespaceURI, localName);
262                if (parser != null) {
263                    Description desc = parser.parseDescription(
264                            response.getXMLStreamReader());
265                    if (desc != null) {
266                        if (descriptions == null) {
267                            descriptions = new ArrayList<Description>();
268                        }
269                        descriptions.add(desc);
270                    }
271                } else {
272                    logger.info("skipping <description> for {}", new QName(
273                            namespaceURI, localName));
274                }
275                response.readEnd(OAI_NS, "description", true);
276                job.setDescriptions(descriptions);
277            }
278            response.readEnd(OAI_NS, "Identify");
279            response.close();
280
281            job.onIdentify();
282
283            return true;
284        } finally {
285            if (response != null) {
286                response.release(job);
287            }
288        }
289    }
290
291
292    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
293            throws IOException, XMLStreamException, HarvesterException {
294        Response response = null;
295        try {
296            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
297            response = execute(job, true);
298            if (response == null) {
299                return false;
300            }
301
302            response.readStart(OAI_NS, "ListMetadataFormats", true);
303            List<MetadataFormat> metadataFormats =
304                    new ArrayList<MetadataFormat>();
305            while (response.readStart(OAI_NS, "metadataFormat",
306                    metadataFormats.isEmpty())) {
307                final String prefix =
308                        response.readContent(OAI_NS, "metadataPrefix", true);
309                final String schema =
310                        response.readContent(OAI_NS, "schema", true);
311                final String namespace =
312                        response.readContent(OAI_NS, "metadataNamespace", true);
313                metadataFormats.add(
314                        new MetadataFormatImpl(prefix, schema, namespace));
315                response.readEnd(OAI_NS, "metadataFormat");
316            } // while
317            response.readEnd(OAI_NS, "ListMetadataFormats");
318            response.close();
319
320            job.onListMetadataFormats(metadataFormats);
321
322            return true;
323        } finally {
324            if (response != null) {
325                response.release(job);
326            }
327        }
328    }
329
330
331    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
332            XMLStreamException, HarvesterException {
333        Response response = null;
334        try {
335            String token = job.getResumptionToken();
336            uriBuilder.setVerb(VERB_LIST_RECORDS);
337            if (token == null) {
338                uriBuilder.addParameter(ARG_METADATA_PREFIX,
339                        job.getCurrentPrefix());
340                if (job.getFrom() != null) {
341                    uriBuilder.addParameter("from",
342                            formatDate(job.getGranularity(), job.getFrom()));
343                }
344                if (job.getUntil() != null) {
345                    uriBuilder.addParameter("until",
346                            formatDate(job.getGranularity(), job.getUntil()));
347                }
348                if (job.getSet() != null) {
349                    uriBuilder.addParameter("set", job.getSet());
350                }
351            } else {
352                uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token);
353                job.incResumptionCount();
354            }
355
356            response = execute(job, true);
357            if (response == null) {
358                return false;
359            }
360
361            String nextToken = null;
362
363            response.readStart(OAI_NS, "ListRecords", true);
364            header.reset();
365            boolean first = true;
366            while (response.readStart(OAI_NS, "record", first)) {
367                boolean deleted = false;
368
369                response.readStart(OAI_NS, "header", true, true);
370                String s = response.readAttributeValue(null, "status");
371                if (s != null) {
372                    if ("deleted".equals(s)) {
373                        header.setDeleted(true);
374                        deleted = true;
375                    } else {
376                        throw new XMLStreamException("attribute 'status' of "
377                                + "element 'record' must contain 'deleted'",
378                                response.getLocation());
379                    }
380                }
381                response.consumeStart();
382                s = response.readContent(OAI_NS, "identifier", true);
383                header.setIdentifier(s);
384                s = response.readContent(OAI_NS, "datestamp", true);
385                header.setDatestamp(parseDate(job.getGranularity(), s));
386                while (response.readStart(OAI_NS, "setSpec", false)) {
387                    header.addSetSpec(response.readString(true));
388                    response.readEnd(OAI_NS, "setSpec", true);
389                }
390                response.readEnd(OAI_NS, "header");
391                job.onRecord(header);
392                /*
393                 * only parse metadata from non-deleted records
394                 */
395                if (!deleted) {
396                    if (response.readStart(OAI_NS, "metadata", true)) {
397                        response.consumeWhitespace();
398                        readerWrapper.reset(response.getXMLStreamReader());
399                        job.onRecordMetadata(header, readerWrapper);
400                        response.consumeWhitespace();
401                        response.readEnd(OAI_NS, "metadata");
402                    }
403                }
404                while (response.readStart(OAI_NS, "about", false)) {
405                    response.consumeWhitespace();
406                    job.onRecordAbout(header, null);
407                    response.consumeWhitespace();
408                    response.readEnd(OAI_NS, "about", true);
409                } // while (about)
410                response.readEnd(OAI_NS, "record");
411                first = false;
412                header.reset();
413            } // while (record)
414
415            /*
416             * expect resumptionToken (possibly empty) when request was resumed
417             */
418            if (response.readStart(OAI_NS, "resumptionToken",
419                    (token != null), true)) {
420                // XXX: read attributes here!
421                response.consumeStart();
422
423                nextToken = response.readString(false);
424                if (nextToken != null) {
425                    job.setResumptionToken(nextToken);
426                }
427                response.readEnd(OAI_NS, "resumptionToken");
428            }
429            response.readEnd(OAI_NS, "ListRecords");
430
431            response.close();
432
433            return (nextToken == null);
434        } catch (HarvesterProtocolErrorException e) {
435            /*
436             * XXX: handle NO_RECORDS_MATCH protocol error
437             */
438            List<ProtocolError> errors = e.getErrors();
439            if ((errors != null) && !errors.isEmpty()) {
440                for (ProtocolError error : errors) {
441                    if (error.isCode(ProtocolError.Code.NO_RECORDS_MATCH)) {
442                        logger.debug("no records match prefix '{}'",
443                                job.getCurrentPrefix());
444                        return true;
445                    }
446                }
447            }
448            throw e;
449        } finally {
450            if (response != null) {
451                response.release(job);
452            }
453        }
454    }
455
456
457    private Response execute(HarvestJobImpl job, boolean compress)
458            throws IOException, XMLStreamException, HarvesterException {
459        final long now = System.currentTimeMillis();
460        job.incRequestCount();
461        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
462        logger.debug("Performing HTTP GET request to {}", uri);
463        final HttpGet request = new HttpGet(uri);
464        if (compress) {
465            final int mask = job.getCompressionMask();
466            if ((mask & Repository.COMPRESSION_METHOD_GZIP) > 0) {
467                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
468            }
469            if ((mask & Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
470                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
471            }
472            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
473        }
474        request.addHeader(HEADER_CONNECTION, "close");
475        // FIXME: get timeout values from somewhere?
476        HttpParams params = request.getParams();
477        params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
478        params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
479
480        final HttpResponse response = client.execute(request);
481        /*
482         * FIXME: the following code need serious re-factoring
483         */
484        boolean close = false;
485        try {
486            job.addToNetworkTime(System.currentTimeMillis() - now);
487            final int status = response.getStatusLine().getStatusCode();
488            if (status == HttpStatus.SC_OK) {
489                delay = 0;
490                return new Response(job, response, harvester);
491            } else {
492                if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
493                    delay = parseTryAfter(response);
494                    logger.debug("got service unavailable status, retrying " +
495                            "after {} seconds", delay);
496                    close = true;
497                    return null;
498                } else {
499                    throw new HarvesterException(
500                            "provider returned unexpected HTTP status: " +
501                                    status);
502                }
503            }
504        } catch (IOException e) {
505            close = true;
506            throw e;
507        } catch (XMLStreamException e) {
508            close = true;
509            throw e;
510        } catch (HarvesterException e) {
511            close = true;
512            throw e;
513        } finally {
514            if (close) {
515                /*
516                 * try hard to release HTTP client resources ...
517                 */
518                try {
519                    EntityUtils.consume(response.getEntity());
520                } catch (IOException ex) {
521                    /* IGNORE */
522                }
523
524                /* make sure to release allocated resources */
525                HttpClientUtils.closeQuietly(response);
526
527                if (request != null) {
528                    request.abort();
529                }
530            }
531        }
532    }
533
534
535    private long parseTryAfter(HttpResponse response) {
536        long delay = -1;
537        final Header s = response.getFirstHeader("Retry-After");
538        if (s != null) {
539            final String v = s.getValue();
540            try {
541                /*
542                 * if plain number, it is the time in seconds
543                 */
544                delay = Integer.parseInt(v) * 1000;
545            } catch (NumberFormatException e) {
546                try {
547                    Date date = DateUtils.parseDate(v);
548                    delay = date.getTime() - System.currentTimeMillis();
549                } catch (DateParseException f) {
550                    /* IGNORE */
551                }
552            }
553        }
554        return (delay > 0) ? delay : 0;
555    }
556
557
558    private static Date parseDate(Repository.Granularity granularity, String s)
559            throws HarvesterException {
560        final MutableDateTime date = new MutableDateTime(0);
561        for (int i = 0; i < DATEFORMATS.length; i++) {
562            DateTimeFormatter fmt = DateTimeFormat
563                    .forPattern(DATEFORMATS[i])
564                    .withZone(DateTimeZone.UTC);
565            if (fmt.parseInto(date, s, 0) == s.length()) {
566                if (DATEFORMATS[i].equals(DATEFORMAT_SECONDS) &&
567                        (granularity == Repository.Granularity.DAYS)) {
568                    logger.warn("repository announced DAYS granularity but " +
569                            "provided timestamp with SECONDS granularity");
570                }
571                if (DATEFORMATS[i].equals(DATEFORMAT_DAYS)) {
572                    date.setTime(0, 0, 0, 0);
573                }
574                return date.toDate();
575            }
576        }
577        throw new HarvesterException("invalid date: " + s);
578    }
579
580
581    private static String formatDate(Repository.Granularity granularity,
582            Date date) throws HarvesterException {
583        DateTimeFormatter fmt = null;
584        switch (granularity) {
585        case DAYS:
586            fmt = DateTimeFormat.forPattern(DATEFORMAT_DAYS)
587                .withZone(DateTimeZone.UTC);
588            break;
589        case SECONDS:
590            fmt = DateTimeFormat.forPattern(DATEFORMAT_SECONDS)
591                .withZone(DateTimeZone.UTC);
592            break;
593        default:
594            /* cannot happen, but silence FindBugs warning */
595            throw new HarvesterException("invalid granularity: " + granularity);
596        }
597        return fmt.print(new DateTime(date));
598    }
599
600} // class HarvestWorker
Note: See TracBrowser for help on using the repository browser.