package eu.clarin.cmdi.oai.harvester.impl; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Date; import java.util.List; import javax.xml.namespace.QName; import javax.xml.stream.XMLStreamException; import org.apache.http.Header; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.config.CookieSpecs; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.utils.DateUtils; import org.apache.http.config.SocketConfig; import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.protocol.BasicHttpContext; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.MutableDateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.clarin.cmdi.oai.harvester.Description; import eu.clarin.cmdi.oai.harvester.HarvestJob.State; import eu.clarin.cmdi.oai.harvester.HarvesterException; import eu.clarin.cmdi.oai.harvester.HarvesterProtocolErrorException; import eu.clarin.cmdi.oai.harvester.MetadataFormat; import eu.clarin.cmdi.oai.harvester.ProtocolError; import eu.clarin.cmdi.oai.harvester.Repository; import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion; import eu.clarin.cmdi.oai.harvester.Repository.Granularity; import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser; import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task; public class HarvestWorker { private static final String USER_AGENT = "OAI-Harvester/1.0.0"; private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/"; private static final String ARG_METADATA_PREFIX = "metadataPrefix"; private static final String ARG_RESUMPTION_TOKEN = "resumptionToken"; private static final String PROTOCOL_VERSION = "2.0"; private static final String DELETED_NOTION_NO = "no"; private static final String DELETED_NOTION_PERSISTENT = "persistent"; private static final String DELETED_NOTION_TRANSIENT = "transient"; private static final String GRANULARITY_DAYS = "YYYY-MM-DD"; private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ"; private static final String COMPRESSION_DEFLATE = "deflate"; private static final String COMPRESSION_GZIP = "gzip"; private static final String VERB_IDENTIFY = "Identify"; private static final String VERB_LIST_METADATAFORMATS = "ListMetadataFormats"; private static final String VERB_LIST_RECORDS = "ListRecords"; private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding"; private static final String HEADER_CONNECTION = "Connection"; private static final String DATEFORMAT_SECONDS = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'"; private static final String DATEFORMAT_DAYS = "yyyy'-'MM'-'dd"; private static final String[] DATEFORMATS = { DATEFORMAT_SECONDS, DATEFORMAT_DAYS }; private static final Logger logger = LoggerFactory.getLogger(HarvestWorker.class); private final AbstractHarvester harvester; private final CloseableHttpClient client; private final RequestUriBuilder uriBuilder; private final HeaderImpl header; private final XmlStreamReaderProxy readerWrapper; private long delay; HarvestWorker(AbstractHarvester harvester) { this.harvester = harvester; // create HTTP client // FIXME: get timeout values from somewhere? final int connectTimeout = 30 * 1000; final int socketTimeout = 180 * 1000; client = createHttpClient(connectTimeout, socketTimeout); // request stuff uriBuilder = new RequestUriBuilder(); // other stuff this.header = new HeaderImpl(); this.readerWrapper = new XmlStreamReaderProxy(); } long process(HarvestJobImpl job, int maxNetworkTries) throws HarvesterException { if (job.isState(State.CANCELED)) { return -1; } if (!job.isState(State.RUNNING)) { throw new HarvesterException("job is not in running state"); } boolean ok; try { switch (job.getTask()) { case IDENTIFY_REPOSITORY: ok = doIdentifyTask(job); if (ok) { job.setTask(Task.ENUMERATE_METADATA_FORMATS); } break; case ENUMERATE_METADATA_FORMATS: ok = doListMetadataFormatsTask(job); if (ok) { job.setTask(Task.HARVEST_RECORDS); } break; case HARVEST_RECORDS: // check for new prefix and notify start, if needed if (job.isHarvestingNewPrefix()) { job.onStartListingRecords(); } ok = doListRecordsTask(job); if (ok) { // notify end of prefix job.onFinishListingRecords(); if (!job.removeCurrentPrefix()) { job.setState(State.DONE); } } else if ((job.getResumptionToken() != null) && (delay == 0)) { /* * special case: request successful, but list was split due * to OAI flow control */ ok = true; } break; default: throw new HarvesterException("internal error"); } // switch if (ok) { job.resetNetworkRetryCount(); } else { if (delay > 0) { job.setNetworkRequestDelay(delay); } if (job.incNetworkRetryCount() >= maxNetworkTries) { throw new HarvesterException("too many network retries: " + job.getNetworkRetryCount()); } } return delay; } catch (Throwable e) { job.setState(State.ERROR); if (e instanceof HarvesterException) { throw (HarvesterException) e; } else if (e instanceof XMLStreamException) { throw new HarvesterException("malformed OAI response", e); } else if (e instanceof IOException) { throw new HarvesterException("network error", e); } else { throw new HarvesterException("unexpected exception", e); } } } private boolean doIdentifyTask(HarvestJobImpl job) throws IOException, XMLStreamException, HarvesterException { Response response = null; try { uriBuilder.setVerb(VERB_IDENTIFY); response = execute(job, false); if (response == null) { return false; } response.readStart(OAI_NS, "Identify", true); String s = response.readContent(OAI_NS, "repositoryName", true); job.setRepositoryName(s); s = response.readContent(OAI_NS, "baseURL", true); job.setBaseURL(s); s = response.readContent(OAI_NS, "protocolVersion", true); if (!PROTOCOL_VERSION.equals(s)) { throw new HarvesterException( "unsupported protocol version: " + s); } job.setProtocolVersion(PROTOCOL_VERSION); List adminEmail = null; do { s = response.readContent(OAI_NS, "adminEmail", (adminEmail == null)); if (s != null) { if (adminEmail == null) { adminEmail = new ArrayList(); } adminEmail.add(s); } } while (s != null); job.setAdminEmail(adminEmail); /* * defer setting of earliestDatestamp in job until we know the * datestamp granularity of the repository */ final String earliestDatestamp = response.readContent(OAI_NS, "earliestDatestamp", true); s = response.readContent(OAI_NS, "deletedRecord", true); if (DELETED_NOTION_NO.equals(s)) { job.setDeletedNotion(DeletedNotion.NO); } else if (DELETED_NOTION_PERSISTENT.equals(s)) { job.setDeletedNotion(DeletedNotion.PERSISTENT); } else if (DELETED_NOTION_TRANSIENT.equals(s)) { job.setDeletedNotion(DeletedNotion.TRANSIENT); } else { throw new HarvesterException( "invalid value for 'deletedRecord': " + s); } s = response.readContent(OAI_NS, "granularity", true); Granularity granularity = null; if (GRANULARITY_DAYS.equals(s)) { granularity = Granularity.DAYS; } else if (GRANULARITY_SECONDS.equals(s)) { granularity = Granularity.SECONDS; } else { throw new HarvesterException( "invalid value for 'granularity': " + s); } job.setGranularity(granularity); job.setEarliestTimestamp(parseDate(granularity, earliestDatestamp)); int mask = 0; do { s = response.readContent(OAI_NS, "compression", false, false); if (s != null) { if (s.isEmpty()) { logger.warn("skipping empty element " + "in Identify response"); } else if (COMPRESSION_DEFLATE.equals(s)) { mask |= Repository.COMPRESSION_METHOD_DEFLATE; } else if (COMPRESSION_GZIP.equals(s)) { mask |= Repository.COMPRESSION_METHOD_GZIP; } else { logger.info("ignoring unsupported compression " + "method '{}'", s); } } } while (s != null); job.setCompressionMask(mask); List descriptions = null; while (response.readStart(OAI_NS, "description", false)) { response.consumeWhitespace(); final String namespaceURI = response.readNamespaceURI(); final String localName = response.peekElementLocalName(); DescriptionParser parser = harvester.findDescriptionParser( namespaceURI, localName); if (parser != null) { Description desc = parser.parseDescription( response.getXMLStreamReader()); if (desc != null) { if (descriptions == null) { descriptions = new ArrayList(); } descriptions.add(desc); } } else { logger.info("skipping for {}", new QName( namespaceURI, localName)); } response.readEnd(OAI_NS, "description", true); job.setDescriptions(descriptions); } response.readEnd(OAI_NS, "Identify"); response.close(); job.onIdentify(); return true; } finally { if (response != null) { response.release(job); } } } private boolean doListMetadataFormatsTask(HarvestJobImpl job) throws IOException, XMLStreamException, HarvesterException { Response response = null; try { uriBuilder.setVerb(VERB_LIST_METADATAFORMATS); response = execute(job, true); if (response == null) { return false; } response.readStart(OAI_NS, "ListMetadataFormats", true); List metadataFormats = new ArrayList(); while (response.readStart(OAI_NS, "metadataFormat", metadataFormats.isEmpty())) { final String prefix = response.readContent(OAI_NS, "metadataPrefix", true); final String schema = response.readContent(OAI_NS, "schema", true); final String namespace = response.readContent(OAI_NS, "metadataNamespace", true); metadataFormats.add( new MetadataFormatImpl(prefix, schema, namespace)); response.readEnd(OAI_NS, "metadataFormat"); } // while response.readEnd(OAI_NS, "ListMetadataFormats"); response.close(); job.onListMetadataFormats(metadataFormats); return true; } finally { if (response != null) { response.release(job); } } } private boolean doListRecordsTask(HarvestJobImpl job) throws IOException, XMLStreamException, HarvesterException { Response response = null; try { String token = job.getResumptionToken(); uriBuilder.setVerb(VERB_LIST_RECORDS); if (token == null) { uriBuilder.addParameter(ARG_METADATA_PREFIX, job.getCurrentPrefix()); if (job.getFrom() != null) { uriBuilder.addParameter("from", formatDate(job.getGranularity(), job.getFrom())); } if (job.getUntil() != null) { uriBuilder.addParameter("until", formatDate(job.getGranularity(), job.getUntil())); } if (job.getSet() != null) { uriBuilder.addParameter("set", job.getSet()); } } else { uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token); job.incResumptionCount(); } response = execute(job, true); if (response == null) { return false; } response.readStart(OAI_NS, "ListRecords", true); header.reset(); boolean first = true; while (response.readStart(OAI_NS, "record", first)) { boolean deleted = false; response.readStart(OAI_NS, "header", true, true); String s = response.readAttributeValue(null, "status"); if (s != null) { if ("deleted".equals(s)) { header.setDeleted(true); deleted = true; } else { throw new XMLStreamException("attribute 'status' of " + "element 'record' must contain 'deleted'", response.getLocation()); } } response.consumeStart(); s = response.readContent(OAI_NS, "identifier", true); header.setIdentifier(s); s = response.readContent(OAI_NS, "datestamp", true); header.setDatestamp(parseDate(job.getGranularity(), s)); while (response.readStart(OAI_NS, "setSpec", false)) { header.addSetSpec(response.readString(true)); response.readEnd(OAI_NS, "setSpec", true); } response.readEnd(OAI_NS, "header"); job.onRecord(header); /* * only parse metadata from non-deleted records */ if (!deleted) { if (response.readStart(OAI_NS, "metadata", true)) { response.consumeWhitespace(); if (!response.getXMLStreamReader().isCharacters()) { readerWrapper.reset(response.getXMLStreamReader()); job.onRecordMetadata(header, readerWrapper); } else { // FIXME: notify handler somehow of failed record logger.error("invalid metadata record with identifier '{}' contains only characters", header.getIdentifier()); } response.consumeWhitespace(); response.readEnd(OAI_NS, "metadata", true); } } while (response.readStart(OAI_NS, "about", false)) { response.consumeWhitespace(); job.onRecordAbout(header, null); response.consumeWhitespace(); response.readEnd(OAI_NS, "about", true); } // while (about) response.readEnd(OAI_NS, "record"); first = false; header.reset(); } // while (record) /* * expect resumptionToken (possibly empty) when request was resumed */ String nextToken = null; boolean got_token = false; if (response.readStart(OAI_NS, "resumptionToken", false, true)) { // XXX: read attributes here! response.consumeStart(); nextToken = response.readString(false); if (nextToken != null) { if (!nextToken.isEmpty()) { job.setResumptionToken(nextToken); } else { nextToken = null; } } response.readEnd(OAI_NS, "resumptionToken"); got_token = true; } if ((token != null) && (nextToken == null) && !got_token) { logger.error("An element was expected, " + "but none was found; assuming a complete result list."); logger.error("An OAI provider is required to serialize an " + "empty element to announce that " + "the result list is complete."); } response.readEnd(OAI_NS, "ListRecords"); response.close(); return (nextToken == null); } catch (HarvesterProtocolErrorException e) { /* * XXX: handle NO_RECORDS_MATCH protocol error */ List errors = e.getErrors(); if ((errors != null) && !errors.isEmpty()) { for (ProtocolError error : errors) { if (error.isCode(ProtocolError.Code.NO_RECORDS_MATCH)) { logger.debug("no records match prefix '{}'", job.getCurrentPrefix()); return true; } } } throw e; } finally { if (response != null) { response.release(job); } } } private Response execute(HarvestJobImpl job, boolean compress) throws IOException, XMLStreamException, HarvesterException { final long now = System.currentTimeMillis(); job.incRequestCount(); final URI uri = uriBuilder.createURI(job.getRepositoryURI()); logger.debug("Performing HTTP GET request to {}", uri); final HttpGet request = new HttpGet(uri); if (compress) { final int mask = job.getCompressionMask(); if ((mask & Repository.COMPRESSION_METHOD_GZIP) > 0) { request.addHeader(HEADER_ACCEPT_ENCODING, "gzip"); } if ((mask & Repository.COMPRESSION_METHOD_DEFLATE) > 0) { request.addHeader(HEADER_ACCEPT_ENCODING, "deflate"); } request.addHeader(HEADER_ACCEPT_ENCODING, "identity"); } request.addHeader(HEADER_CONNECTION, "close"); /* * FIXME: the following code need serious re-factoring */ CloseableHttpResponse response = null; boolean close = true; try { response = client.execute(request, new BasicHttpContext()); job.addToNetworkTime(System.currentTimeMillis() - now); final int status = response.getStatusLine().getStatusCode(); if (status == HttpStatus.SC_OK) { delay = 0; close = false; return new Response(job, response, harvester); } else { if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) { delay = parseTryAfter(response); logger.debug("got service unavailable status, retrying " + "after {} seconds", delay); return null; } else { throw new HarvesterException( "provider returned unexpected HTTP status: " + status); } } } catch (IOException e) { throw e; } catch (XMLStreamException e) { throw e; } catch (HarvesterException e) { throw e; } finally { if (close && (response != null)) { /* make sure. response is closed ... */ response.close(); } } } private long parseTryAfter(HttpResponse response) { long delay = -1; final Header s = response.getFirstHeader("Retry-After"); if (s != null) { final String v = s.getValue(); try { /* * if plain number, it is the time in seconds */ delay = Integer.parseInt(v) * 1000; } catch (NumberFormatException e) { Date date = DateUtils.parseDate(v); if (date != null) { delay = date.getTime() - System.currentTimeMillis(); } } } return (delay > 0) ? delay : 0; } private static Date parseDate(Repository.Granularity granularity, String s) throws HarvesterException { final MutableDateTime date = new MutableDateTime(0); for (int i = 0; i < DATEFORMATS.length; i++) { DateTimeFormatter fmt = DateTimeFormat .forPattern(DATEFORMATS[i]) .withZone(DateTimeZone.UTC); if (fmt.parseInto(date, s, 0) == s.length()) { if (DATEFORMATS[i].equals(DATEFORMAT_SECONDS) && (granularity == Repository.Granularity.DAYS)) { logger.warn("repository announced DAYS granularity but " + "provided timestamp with SECONDS granularity"); } if (DATEFORMATS[i].equals(DATEFORMAT_DAYS)) { date.setTime(0, 0, 0, 0); } return date.toDate(); } } throw new HarvesterException("invalid date: " + s); } private static String formatDate(Repository.Granularity granularity, Date date) throws HarvesterException { DateTimeFormatter fmt = null; switch (granularity) { case DAYS: fmt = DateTimeFormat.forPattern(DATEFORMAT_DAYS) .withZone(DateTimeZone.UTC); break; case SECONDS: fmt = DateTimeFormat.forPattern(DATEFORMAT_SECONDS) .withZone(DateTimeZone.UTC); break; default: /* cannot happen, but silence FindBugs warning */ throw new HarvesterException("invalid granularity: " + granularity); } return fmt.print(new DateTime(date)); } private static CloseableHttpClient createHttpClient(int connectTimeout, int socketTimeout) { final PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(); manager.setDefaultMaxPerRoute(8); manager.setMaxTotal(128); final SocketConfig socketConfig = SocketConfig.custom() .setSoReuseAddress(true) .setSoLinger(0) .build(); final RequestConfig requestConfig = RequestConfig.custom() .setAuthenticationEnabled(false) .setRedirectsEnabled(true) .setMaxRedirects(4) .setCircularRedirectsAllowed(false) .setCookieSpec(CookieSpecs.IGNORE_COOKIES) .setConnectTimeout(connectTimeout) .setSocketTimeout(socketTimeout) .setConnectionRequestTimeout(0) /* infinite */ .setStaleConnectionCheckEnabled(false) .build(); return HttpClients.custom() .setUserAgent(USER_AGENT) .setConnectionManager(manager) .setDefaultSocketConfig(socketConfig) .setDefaultRequestConfig(requestConfig) .setConnectionReuseStrategy(new NoConnectionReuseStrategy()) .build(); } } // class HarvestWorker