source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java @ 5458

Last change on this file since 5458 was 5458, checked in by Oliver Schonefeld, 10 years ago
  • clean up pom.xml
  • update some dependencies
  • update to Apache Http Client 4.3.x
  • Property svn:eol-style set to native
File size: 25.3 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8
9import javax.xml.namespace.QName;
10import javax.xml.stream.XMLStreamException;
11
12import org.apache.http.Header;
13import org.apache.http.HttpResponse;
14import org.apache.http.HttpStatus;
15import org.apache.http.client.config.CookieSpecs;
16import org.apache.http.client.config.RequestConfig;
17import org.apache.http.client.methods.CloseableHttpResponse;
18import org.apache.http.client.methods.HttpGet;
19import org.apache.http.client.utils.DateUtils;
20import org.apache.http.config.SocketConfig;
21import org.apache.http.impl.NoConnectionReuseStrategy;
22import org.apache.http.impl.client.CloseableHttpClient;
23import org.apache.http.impl.client.HttpClients;
24import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
25import org.apache.http.protocol.BasicHttpContext;
26import org.joda.time.DateTime;
27import org.joda.time.DateTimeZone;
28import org.joda.time.MutableDateTime;
29import org.joda.time.format.DateTimeFormat;
30import org.joda.time.format.DateTimeFormatter;
31import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import eu.clarin.cmdi.oai.harvester.Description;
35import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
36import eu.clarin.cmdi.oai.harvester.HarvesterException;
37import eu.clarin.cmdi.oai.harvester.HarvesterProtocolErrorException;
38import eu.clarin.cmdi.oai.harvester.MetadataFormat;
39import eu.clarin.cmdi.oai.harvester.ProtocolError;
40import eu.clarin.cmdi.oai.harvester.Repository;
41import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
42import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
43import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser;
44import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
45
46
47public class HarvestWorker {
48    private static final String USER_AGENT = "OAI-Harvester/1.0.0";
49    private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/";
50    private static final String ARG_METADATA_PREFIX = "metadataPrefix";
51    private static final String ARG_RESUMPTION_TOKEN = "resumptionToken";
52    private static final String PROTOCOL_VERSION = "2.0";
53    private static final String DELETED_NOTION_NO = "no";
54    private static final String DELETED_NOTION_PERSISTENT = "persistent";
55    private static final String DELETED_NOTION_TRANSIENT = "transient";
56    private static final String GRANULARITY_DAYS = "YYYY-MM-DD";
57    private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ";
58    private static final String COMPRESSION_DEFLATE = "deflate";
59    private static final String COMPRESSION_GZIP = "gzip";
60    private static final String VERB_IDENTIFY = "Identify";
61    private static final String VERB_LIST_METADATAFORMATS = "ListMetadataFormats";
62    private static final String VERB_LIST_RECORDS = "ListRecords";
63    private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
64    private static final String HEADER_CONNECTION = "Connection";
65    private static final String DATEFORMAT_SECONDS = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'";
66    private static final String DATEFORMAT_DAYS = "yyyy'-'MM'-'dd";
67    private static final String[] DATEFORMATS =
68        { DATEFORMAT_SECONDS, DATEFORMAT_DAYS };
69    private static final Logger logger =
70            LoggerFactory.getLogger(HarvestWorker.class);
71    private final AbstractHarvester harvester;
72    private final CloseableHttpClient client;
73    private final RequestUriBuilder uriBuilder;
74    private final HeaderImpl header;
75    private final XmlStreamReaderProxy readerWrapper;
76    private long delay;
77
78
79    HarvestWorker(AbstractHarvester harvester) {
80        this.harvester = harvester;
81
82        // create HTTP client
83        // FIXME: get timeout values from somewhere?
84        final int connectTimeout = 30 * 1000;
85        final int socketTimeout = 180 * 1000;
86        client = createHttpClient(connectTimeout, socketTimeout);
87
88        // request stuff
89        uriBuilder = new RequestUriBuilder();
90        // other stuff
91        this.header        = new HeaderImpl();
92        this.readerWrapper = new XmlStreamReaderProxy();
93    }
94
95
96    long process(HarvestJobImpl job, int maxNetworkTries)
97            throws HarvesterException {
98        if (job.isState(State.CANCELED)) {
99            return -1;
100        }
101        if (!job.isState(State.RUNNING)) {
102            throw new HarvesterException("job is not in running state");
103        }
104
105        boolean ok;
106        try {
107            switch (job.getTask()) {
108            case IDENTIFY_REPOSITORY:
109                ok = doIdentifyTask(job);
110                if (ok) {
111                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
112                }
113                break;
114            case ENUMERATE_METADATA_FORMATS:
115                ok = doListMetadataFormatsTask(job);
116                if (ok) {
117                    job.setTask(Task.HARVEST_RECORDS);
118                }
119                break;
120            case HARVEST_RECORDS:
121                // check for new prefix and notify start, if needed
122                if (job.isHarvestingNewPrefix()) {
123                    job.onStartListingRecords();
124                }
125                ok = doListRecordsTask(job);
126                if (ok) {
127                    // notify end of prefix
128                    job.onFinishListingRecords();
129                    if (!job.removeCurrentPrefix()) {
130                        job.setState(State.DONE);
131                    }
132                } else if ((job.getResumptionToken() != null) && (delay == 0)) {
133                    /*
134                     * special case: request successful, but list was split due
135                     * to OAI flow control
136                     */
137                    ok = true;
138                }
139                break;
140            default:
141                throw new HarvesterException("internal error");
142            } // switch
143
144            if (ok) {
145                job.resetNetworkRetryCount();
146            } else {
147                if (delay > 0) {
148                    job.setNetworkRequestDelay(delay);
149                }
150                if (job.incNetworkRetryCount() >= maxNetworkTries) {
151                    throw new HarvesterException("too many network retries: " +
152                            job.getNetworkRetryCount());
153                }
154            }
155            return delay;
156        } catch (Throwable e) {
157            job.setState(State.ERROR);
158            if (e instanceof HarvesterException) {
159                throw (HarvesterException) e;
160            } else if (e instanceof XMLStreamException) {
161                throw new HarvesterException("malformed OAI response", e);
162            } else if (e instanceof IOException) {
163                throw new HarvesterException("network error", e);
164            } else {
165                throw new HarvesterException("unexpected exception", e);
166            }
167        }
168    }
169
170
171    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
172            XMLStreamException, HarvesterException {
173        Response response = null;
174        try {
175            uriBuilder.setVerb(VERB_IDENTIFY);
176            response = execute(job, false);
177            if (response == null) {
178                return false;
179            }
180
181            response.readStart(OAI_NS, "Identify", true);
182
183            String s = response.readContent(OAI_NS, "repositoryName", true);
184            job.setRepositoryName(s);
185
186            s = response.readContent(OAI_NS, "baseURL", true);
187            job.setBaseURL(s);
188
189            s = response.readContent(OAI_NS, "protocolVersion", true);
190            if (!PROTOCOL_VERSION.equals(s)) {
191                throw new HarvesterException(
192                        "unsupported protocol version: " + s);
193            }
194            job.setProtocolVersion(PROTOCOL_VERSION);
195
196            List<String> adminEmail = null;
197            do {
198                s = response.readContent(OAI_NS,
199                        "adminEmail", (adminEmail == null));
200                if (s != null) {
201                    if (adminEmail == null) {
202                        adminEmail = new ArrayList<String>();
203                    }
204                    adminEmail.add(s);
205                }
206            } while (s != null);
207            job.setAdminEmail(adminEmail);
208
209            /*
210             * defer setting of earliestDatestamp in job until we know the
211             * datestamp granularity of the repository
212             */
213            final String earliestDatestamp =
214                    response.readContent(OAI_NS, "earliestDatestamp", true);
215
216            s = response.readContent(OAI_NS, "deletedRecord", true);
217            if (DELETED_NOTION_NO.equals(s)) {
218                job.setDeletedNotion(DeletedNotion.NO);
219            } else if (DELETED_NOTION_PERSISTENT.equals(s)) {
220                job.setDeletedNotion(DeletedNotion.PERSISTENT);
221            } else if (DELETED_NOTION_TRANSIENT.equals(s)) {
222                job.setDeletedNotion(DeletedNotion.TRANSIENT);
223            } else {
224                throw new HarvesterException(
225                        "invalid value for 'deletedRecord': " + s);
226            }
227
228            s = response.readContent(OAI_NS, "granularity", true);
229            Granularity granularity = null;
230            if (GRANULARITY_DAYS.equals(s)) {
231                granularity = Granularity.DAYS;
232            } else if (GRANULARITY_SECONDS.equals(s)) {
233                granularity = Granularity.SECONDS;
234            } else {
235                throw new HarvesterException(
236                        "invalid value for 'granularity': " + s);
237            }
238            job.setGranularity(granularity);
239            job.setEarliestTimestamp(parseDate(granularity, earliestDatestamp));
240
241            int mask = 0;
242            do {
243                s = response.readContent(OAI_NS, "compression", false, false);
244                if (s != null) {
245                    if (s.isEmpty()) {
246                        logger.warn("skipping empty <compression> element " +
247                                "in Identify response");
248                    } else if (COMPRESSION_DEFLATE.equals(s)) {
249                        mask |= Repository.COMPRESSION_METHOD_DEFLATE;
250                    } else if (COMPRESSION_GZIP.equals(s)) {
251                        mask |= Repository.COMPRESSION_METHOD_GZIP;
252                    } else {
253                        logger.info("ignoring unsupported compression " +
254                                "method '{}'", s);
255                    }
256                }
257            } while (s != null);
258            job.setCompressionMask(mask);
259
260            List<Description> descriptions = null;
261            while (response.readStart(OAI_NS, "description", false)) {
262                response.consumeWhitespace();
263                final String namespaceURI = response.readNamespaceURI();
264                final String localName = response.peekElementLocalName();
265                DescriptionParser parser = harvester.findDescriptionParser(
266                        namespaceURI, localName);
267                if (parser != null) {
268                    Description desc = parser.parseDescription(
269                            response.getXMLStreamReader());
270                    if (desc != null) {
271                        if (descriptions == null) {
272                            descriptions = new ArrayList<Description>();
273                        }
274                        descriptions.add(desc);
275                    }
276                } else {
277                    logger.info("skipping <description> for {}", new QName(
278                            namespaceURI, localName));
279                }
280                response.readEnd(OAI_NS, "description", true);
281                job.setDescriptions(descriptions);
282            }
283            response.readEnd(OAI_NS, "Identify");
284            response.close();
285
286            job.onIdentify();
287
288            return true;
289        } finally {
290            if (response != null) {
291                response.release(job);
292            }
293        }
294    }
295
296
297    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
298            throws IOException, XMLStreamException, HarvesterException {
299        Response response = null;
300        try {
301            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
302            response = execute(job, true);
303            if (response == null) {
304                return false;
305            }
306
307            response.readStart(OAI_NS, "ListMetadataFormats", true);
308            List<MetadataFormat> metadataFormats =
309                    new ArrayList<MetadataFormat>();
310            while (response.readStart(OAI_NS, "metadataFormat",
311                    metadataFormats.isEmpty())) {
312                final String prefix =
313                        response.readContent(OAI_NS, "metadataPrefix", true);
314                final String schema =
315                        response.readContent(OAI_NS, "schema", true);
316                final String namespace =
317                        response.readContent(OAI_NS, "metadataNamespace", true);
318                metadataFormats.add(
319                        new MetadataFormatImpl(prefix, schema, namespace));
320                response.readEnd(OAI_NS, "metadataFormat");
321            } // while
322            response.readEnd(OAI_NS, "ListMetadataFormats");
323            response.close();
324
325            job.onListMetadataFormats(metadataFormats);
326
327            return true;
328        } finally {
329            if (response != null) {
330                response.release(job);
331            }
332        }
333    }
334
335
336    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
337            XMLStreamException, HarvesterException {
338        Response response = null;
339        try {
340            String token = job.getResumptionToken();
341            uriBuilder.setVerb(VERB_LIST_RECORDS);
342            if (token == null) {
343                uriBuilder.addParameter(ARG_METADATA_PREFIX,
344                        job.getCurrentPrefix());
345                if (job.getFrom() != null) {
346                    uriBuilder.addParameter("from",
347                            formatDate(job.getGranularity(), job.getFrom()));
348                }
349                if (job.getUntil() != null) {
350                    uriBuilder.addParameter("until",
351                            formatDate(job.getGranularity(), job.getUntil()));
352                }
353                if (job.getSet() != null) {
354                    uriBuilder.addParameter("set", job.getSet());
355                }
356            } else {
357                uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token);
358                job.incResumptionCount();
359            }
360
361            response = execute(job, true);
362            if (response == null) {
363                return false;
364            }
365
366            response.readStart(OAI_NS, "ListRecords", true);
367            header.reset();
368            boolean first = true;
369            while (response.readStart(OAI_NS, "record", first)) {
370                boolean deleted = false;
371
372                response.readStart(OAI_NS, "header", true, true);
373                String s = response.readAttributeValue(null, "status");
374                if (s != null) {
375                    if ("deleted".equals(s)) {
376                        header.setDeleted(true);
377                        deleted = true;
378                    } else {
379                        throw new XMLStreamException("attribute 'status' of "
380                                + "element 'record' must contain 'deleted'",
381                                response.getLocation());
382                    }
383                }
384                response.consumeStart();
385                s = response.readContent(OAI_NS, "identifier", true);
386                header.setIdentifier(s);
387                s = response.readContent(OAI_NS, "datestamp", true);
388                header.setDatestamp(parseDate(job.getGranularity(), s));
389                while (response.readStart(OAI_NS, "setSpec", false)) {
390                    header.addSetSpec(response.readString(true));
391                    response.readEnd(OAI_NS, "setSpec", true);
392                }
393                response.readEnd(OAI_NS, "header");
394                job.onRecord(header);
395                /*
396                 * only parse metadata from non-deleted records
397                 */
398                if (!deleted) {
399                    if (response.readStart(OAI_NS, "metadata", true)) {
400                        response.consumeWhitespace();
401                        if (!response.getXMLStreamReader().isCharacters()) {
402                            readerWrapper.reset(response.getXMLStreamReader());
403                            job.onRecordMetadata(header, readerWrapper);
404                        } else {
405                            // FIXME: notify handler somehow of failed record
406                            logger.error("invalid metadata record with identifier '{}' contains only characters",
407                                    header.getIdentifier());
408                        }
409                        response.consumeWhitespace();
410                        response.readEnd(OAI_NS, "metadata", true);
411                    }
412                }
413                while (response.readStart(OAI_NS, "about", false)) {
414                    response.consumeWhitespace();
415                    job.onRecordAbout(header, null);
416                    response.consumeWhitespace();
417                    response.readEnd(OAI_NS, "about", true);
418                } // while (about)
419                response.readEnd(OAI_NS, "record");
420                first = false;
421                header.reset();
422            } // while (record)
423
424            /*
425             * expect resumptionToken (possibly empty) when request was resumed
426             */
427            String nextToken = null;
428            boolean got_token = false;
429            if (response.readStart(OAI_NS, "resumptionToken", false, true)) {
430                // XXX: read attributes here!
431                response.consumeStart();
432
433                nextToken = response.readString(false);
434                if (nextToken != null) {
435                    if (!nextToken.isEmpty()) {
436                        job.setResumptionToken(nextToken);
437                    } else {
438                        nextToken = null;
439                    }
440                }
441                response.readEnd(OAI_NS, "resumptionToken");
442                got_token = true;
443            }
444            if ((token != null) && (nextToken == null) && !got_token) {
445                logger.error("An <resumptionToken> element was expected, " +
446                        "but none was found; assuming a complete result list.");
447                logger.error("An OAI provider is required to serialize an " +
448                        "empty <resumptionToken> element to announce that " +
449                        "the result list is complete.");
450            }
451            response.readEnd(OAI_NS, "ListRecords");
452
453            response.close();
454
455            return (nextToken == null);
456        } catch (HarvesterProtocolErrorException e) {
457            /*
458             * XXX: handle NO_RECORDS_MATCH protocol error
459             */
460            List<ProtocolError> errors = e.getErrors();
461            if ((errors != null) && !errors.isEmpty()) {
462                for (ProtocolError error : errors) {
463                    if (error.isCode(ProtocolError.Code.NO_RECORDS_MATCH)) {
464                        logger.debug("no records match prefix '{}'",
465                                job.getCurrentPrefix());
466                        return true;
467                    }
468                }
469            }
470            throw e;
471        } finally {
472            if (response != null) {
473                response.release(job);
474            }
475        }
476    }
477
478
479    private Response execute(HarvestJobImpl job, boolean compress)
480            throws IOException, XMLStreamException, HarvesterException {
481        final long now = System.currentTimeMillis();
482        job.incRequestCount();
483        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
484        logger.debug("Performing HTTP GET request to {}", uri);
485        final HttpGet request = new HttpGet(uri);
486        if (compress) {
487            final int mask = job.getCompressionMask();
488            if ((mask & Repository.COMPRESSION_METHOD_GZIP) > 0) {
489                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
490            }
491            if ((mask & Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
492                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
493            }
494            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
495        }
496        request.addHeader(HEADER_CONNECTION, "close");
497
498        /*
499         * FIXME: the following code need serious re-factoring
500         */
501        CloseableHttpResponse response = null;
502        boolean close = true;
503        try {
504            response = client.execute(request, new BasicHttpContext());
505            job.addToNetworkTime(System.currentTimeMillis() - now);
506
507            final int status = response.getStatusLine().getStatusCode();
508            if (status == HttpStatus.SC_OK) {
509                delay = 0;
510                close = false;
511                return new Response(job, response, harvester);
512            } else {
513                if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
514                    delay = parseTryAfter(response);
515                    logger.debug("got service unavailable status, retrying " +
516                            "after {} seconds", delay);
517                    return null;
518                } else {
519                    throw new HarvesterException(
520                            "provider returned unexpected HTTP status: " +
521                                    status);
522                }
523            }
524        } catch (IOException e) {
525            throw e;
526        } catch (XMLStreamException e) {
527            throw e;
528        } catch (HarvesterException e) {
529            throw e;
530        } finally {
531            if (close && (response != null)) {
532                /* make sure. response is closed ... */
533                response.close();
534            }
535        }
536    }
537
538
539    private long parseTryAfter(HttpResponse response) {
540        long delay = -1;
541        final Header s = response.getFirstHeader("Retry-After");
542        if (s != null) {
543            final String v = s.getValue();
544            try {
545                /*
546                 * if plain number, it is the time in seconds
547                 */
548                delay = Integer.parseInt(v) * 1000;
549            } catch (NumberFormatException e) {
550                    Date date = DateUtils.parseDate(v);
551                    if (date != null) {
552                        delay = date.getTime() - System.currentTimeMillis();
553                    }
554            }
555        }
556        return (delay > 0) ? delay : 0;
557    }
558
559
560    private static Date parseDate(Repository.Granularity granularity, String s)
561            throws HarvesterException {
562        final MutableDateTime date = new MutableDateTime(0);
563        for (int i = 0; i < DATEFORMATS.length; i++) {
564            DateTimeFormatter fmt = DateTimeFormat
565                    .forPattern(DATEFORMATS[i])
566                    .withZone(DateTimeZone.UTC);
567            if (fmt.parseInto(date, s, 0) == s.length()) {
568                if (DATEFORMATS[i].equals(DATEFORMAT_SECONDS) &&
569                        (granularity == Repository.Granularity.DAYS)) {
570                    logger.warn("repository announced DAYS granularity but " +
571                            "provided timestamp with SECONDS granularity");
572                }
573                if (DATEFORMATS[i].equals(DATEFORMAT_DAYS)) {
574                    date.setTime(0, 0, 0, 0);
575                }
576                return date.toDate();
577            }
578        }
579        throw new HarvesterException("invalid date: " + s);
580    }
581
582
583    private static String formatDate(Repository.Granularity granularity,
584            Date date) throws HarvesterException {
585        DateTimeFormatter fmt = null;
586        switch (granularity) {
587        case DAYS:
588            fmt = DateTimeFormat.forPattern(DATEFORMAT_DAYS)
589                .withZone(DateTimeZone.UTC);
590            break;
591        case SECONDS:
592            fmt = DateTimeFormat.forPattern(DATEFORMAT_SECONDS)
593                .withZone(DateTimeZone.UTC);
594            break;
595        default:
596            /* cannot happen, but silence FindBugs warning */
597            throw new HarvesterException("invalid granularity: " + granularity);
598        }
599        return fmt.print(new DateTime(date));
600    }
601
602
603    private static CloseableHttpClient createHttpClient(int connectTimeout,
604            int socketTimeout) {
605        final PoolingHttpClientConnectionManager manager =
606                new PoolingHttpClientConnectionManager();
607        manager.setDefaultMaxPerRoute(8);
608        manager.setMaxTotal(128);
609
610        final SocketConfig socketConfig = SocketConfig.custom()
611                .setSoReuseAddress(true)
612                .setSoLinger(0)
613                .build();
614
615        final RequestConfig requestConfig = RequestConfig.custom()
616                .setAuthenticationEnabled(false)
617                .setRedirectsEnabled(true)
618                .setMaxRedirects(4)
619                .setCircularRedirectsAllowed(false)
620                .setCookieSpec(CookieSpecs.IGNORE_COOKIES)
621                .setConnectTimeout(connectTimeout)
622                .setSocketTimeout(socketTimeout)
623                .setConnectionRequestTimeout(0) /* infinite */
624                .setStaleConnectionCheckEnabled(false)
625                .build();
626
627        return HttpClients.custom()
628                .setUserAgent(USER_AGENT)
629                .setConnectionManager(manager)
630                .setDefaultSocketConfig(socketConfig)
631                .setDefaultRequestConfig(requestConfig)
632                .setConnectionReuseStrategy(new NoConnectionReuseStrategy())
633                .build();
634    }
635
636} // class HarvestWorker
Note: See TracBrowser for help on using the repository browser.