source: OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java @ 1140

Last change on this file since 1140 was 1140, checked in by oschonef, 13 years ago
  • make maximum network retry count configurable through Harvester
  • re-naming internal methods
  • Property svn:eol-style set to native
File size: 18.5 KB
Line 
1package eu.clarin.cmdi.oai.harvester.impl;
2
3import java.io.IOException;
4import java.net.URI;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.List;
8
9import javax.xml.stream.XMLStreamException;
10
11import org.apache.http.Header;
12import org.apache.http.HttpResponse;
13import org.apache.http.HttpStatus;
14import org.apache.http.client.methods.HttpGet;
15import org.apache.http.impl.client.DefaultHttpClient;
16import org.apache.http.params.CoreConnectionPNames;
17import org.apache.http.params.CoreProtocolPNames;
18import org.apache.http.params.HttpParams;
19import org.joda.time.DateTime;
20import org.joda.time.DateTimeZone;
21import org.joda.time.MutableDateTime;
22import org.joda.time.format.DateTimeFormat;
23import org.joda.time.format.DateTimeFormatter;
24
25import eu.clarin.cmdi.oai.harvester.HarvestJob.State;
26import eu.clarin.cmdi.oai.harvester.HarvesterException;
27import eu.clarin.cmdi.oai.harvester.MetadataFormat;
28import eu.clarin.cmdi.oai.harvester.Repository;
29import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion;
30import eu.clarin.cmdi.oai.harvester.Repository.Granularity;
31import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task;
32
33public class HarvestWorker {
34    private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/";
35    private static final String ARG_METADATA_PREFIX = "metadataPrefix";
36    private static final String ARG_RESUMPTION_TOKEN = "resumptionToken";
37    private static final String PROTOCOL_VERSION = "2.0";
38    private static final String DELETED_NOTION_NO = "no";
39    private static final String DELETED_NOTION_PERSISTENT = "persistent";
40    private static final String DELETED_NOTION_TRANSIENT = "transient";
41    private static final String GRANULARITY_DAYS = "YYYY-MM-DD";
42    private static final String GRANULARITY_SECONDS = "YYYY-MM-DDThh:mm:ssZ";
43    private static final String COMPRESSION_DEFLATE = "deflate";
44    private static final String COMPRESSION_GZIP = "gzip";
45    private static final String VERB_IDENTIFY = "Identify";
46    private static final String VERB_LIST_METADATAFORMATS =
47        "ListMetadataFormats";
48    private static final String VERB_LIST_RECORDS = "ListRecords";
49    private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding";
50    private static final String HEADER_CONNECTION = "Connection";
51    private static final String[] DATEFORMATS_DAYS =
52        { "yyyy'-'MM'-'dd" };
53    private static final String[] DATEFORMATS_FULL =
54        { "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'", "yyyy'-'MM'-'dd" };
55    private final AbstractHarvester harvester;
56    private final DefaultHttpClient client;
57    private final RequestUriBuilder uriBuilder;
58    private final HeaderImpl header;
59    private final XmlStreamReaderProxy readerWrapper;
60    private long delay;
61
62    HarvestWorker(AbstractHarvester harvester) {
63        this.harvester = harvester;
64        // http client
65        client = new DefaultHttpClient();
66        client.getParams().setParameter(CoreProtocolPNames.USER_AGENT,
67                    "eu.clarin.cmdi.oai.Harvester/0.0.1");
68        // request stuff
69        uriBuilder = new RequestUriBuilder();
70        // other stuff
71        this.header = new HeaderImpl();
72        this.readerWrapper = new XmlStreamReaderProxy();
73    }
74
75    long process(HarvestJobImpl job, int maxNetworkTries)
76            throws HarvesterException {
77        if (job.isState(State.CANCELED)) {
78            return -1;
79        }
80        if (!job.isState(State.RUNNING)) {
81            throw new HarvesterException("job is not in running state");
82        }
83
84        boolean ok;
85        try {
86            switch (job.getTask()) {
87            case IDENTIFY_REPOSITORY:
88                ok = doIdentifyTask(job);
89                if (ok) {
90                    job.setTask(Task.ENUMERATE_METADATA_FORMATS);
91                }
92                break;
93            case ENUMERATE_METADATA_FORMATS:
94                ok = doListMetadataFormatsTask(job);
95                if (ok) {
96                    job.setTask(Task.HARVEST_RECORDS);
97                }
98                break;
99            case HARVEST_RECORDS:
100                // check for new prefix and notify start, if needed
101                if (job.isHarvestingNewPrefix()) {
102                    job.onStartListingRecords();
103                }
104                ok = doListRecordsTask(job);
105                if (ok) {
106                    // notify end of prefix
107                    job.onFinishListingRecords();
108                    if (!job.removeCurrentPrefix()) {
109                        job.setState(State.DONE);
110                    }
111                } else if ((job.getResumptionToken() != null) && (delay == 0)) {
112                    /*
113                     * special case: request successful, but list was split
114                     * die to OAI flow control
115                     */
116                    ok = true;
117                }
118                break;
119            default:
120                throw new HarvesterException("internal error");
121            } // switch
122
123            if (ok) {
124                job.resetNetworkRetryCount();
125            } else {
126                if (delay > 0) {
127                    job.setNetworkRequestDelay(delay);
128                }
129                if (job.incNetworkRetryCount() >= maxNetworkTries) {
130                    throw new HarvesterException("too many network retries: " +
131                            job.getNetworkRetryCount());
132                }
133            }
134            return delay;
135        } catch (Throwable e) {
136            job.setState(State.ERROR);
137            if (e instanceof HarvesterException) {
138                throw (HarvesterException) e;
139            } else if (e instanceof XMLStreamException) {
140                throw new HarvesterException("malformed OAI response", e);
141            } else if (e instanceof IOException) {
142                throw new HarvesterException("network error", e);
143            } else {
144                throw new HarvesterException("unexpected exception", e);
145            }
146        }
147    }
148
149    private boolean doIdentifyTask(HarvestJobImpl job) throws IOException,
150            XMLStreamException, HarvesterException {
151        Response response = null;
152        try {
153            uriBuilder.setVerb(VERB_IDENTIFY);
154            response = execute(job, false);
155            if (response == null) {
156                return false;
157            }
158
159            response.readStart(OAI_NS, "Identify", true);
160
161            String s = response.readContent(OAI_NS, "repositoryName", true);
162            job.setRepositoryName(s);
163
164            s = response.readContent(OAI_NS, "baseURL", true);
165            job.setBaseURL(s);
166
167            s = response.readContent(OAI_NS, "protocolVersion", true);
168            if (!PROTOCOL_VERSION.equals(s)) {
169                throw new HarvesterException("unsupported protocol version: " +
170                        s);
171            }
172            job.setProtocolVersion(PROTOCOL_VERSION);
173
174            List<String> adminEmail = null;
175            do {
176                s = response.readContent(OAI_NS, "adminEmail",
177                        (adminEmail == null));
178                if (s != null) {
179                    if (adminEmail == null) {
180                        adminEmail = new ArrayList<String>();
181                    }
182                    adminEmail.add(s);
183                }
184            } while (s != null);
185            job.setAdminEmail(adminEmail);
186
187            s = response.readContent(OAI_NS, "earliestDatestamp", true);
188            job.setEarliestTimestamp(s);
189
190            s = response.readContent(OAI_NS, "deletedRecord", true);
191            if (DELETED_NOTION_NO.equals(s)) {
192                job.setDeletedNotion(DeletedNotion.NO);
193            } else if (DELETED_NOTION_PERSISTENT.equals(s)) {
194                job.setDeletedNotion(DeletedNotion.PERSISTENT);
195            } else if (DELETED_NOTION_TRANSIENT.equals(s)) {
196                job.setDeletedNotion(DeletedNotion.TRANSIENT);
197            } else {
198                throw new HarvesterException(
199                        "invalid value for 'deletedRecord': " + s);
200            }
201
202            s = response.readContent(OAI_NS, "granularity", true);
203            if (GRANULARITY_DAYS.equals(s)) {
204                job.setGranularity(Granularity.DAYS);
205            } else if (GRANULARITY_SECONDS.equals(s)) {
206                job.setGranularity(Granularity.SECONDS);
207            } else {
208                throw new HarvesterException(
209                        "invalid value for 'granularity': " + s);
210            }
211
212            int mask = 0;
213            do {
214                s = response.readContent(OAI_NS, "compression", false);
215                if (s != null) {
216                    if (COMPRESSION_DEFLATE.equals(s)) {
217                        mask |= Repository.COMPRESSION_METHOD_DEFLATE;
218                    } else if (COMPRESSION_GZIP.equals(s)) {
219                        mask |= Repository.COMPRESSION_METHOD_GZIP;
220                    }
221                }
222            } while (s != null);
223            job.setCompressionMask(mask);
224
225            while (response.readStart(OAI_NS, "description", false)) {
226                response.consumeWhitespace();
227                final String namespaceURI = response.readNamespaceURI();
228                System.err.println("DESCRIPTION: URI = " + namespaceURI);
229                response.readEnd(OAI_NS, "description", true);
230            }
231            response.readEnd(OAI_NS, "Identify");
232            response.close();
233           
234            job.onIdentify();
235
236            return true;
237        } finally {
238            if (response != null) {
239                response.release(job);
240            }
241        }
242    }
243
244    private boolean doListMetadataFormatsTask(HarvestJobImpl job)
245            throws IOException, XMLStreamException, HarvesterException {
246        Response response = null;
247        try {
248            uriBuilder.setVerb(VERB_LIST_METADATAFORMATS);
249            response = execute(job, true);
250            if (response == null) {
251                return false;
252            }
253
254            response.readStart(OAI_NS, "ListMetadataFormats", true);
255            List<MetadataFormat> metadataFormats =
256                new ArrayList<MetadataFormat>();
257            while (response.readStart(OAI_NS, "metadataFormat",
258                    metadataFormats.isEmpty())) {
259                final String prefix =
260                    response.readContent(OAI_NS, "metadataPrefix", true);
261                final String schema =
262                    response.readContent(OAI_NS, "schema", true);
263                final String namespace =
264                    response.readContent(OAI_NS, "metadataNamespace", true);
265                metadataFormats.add(
266                        new MetadataFormatImpl(prefix, schema, namespace));
267                response.readEnd(OAI_NS, "metadataFormat");
268            } // while
269            response.readEnd(OAI_NS, "ListMetadataFormats");
270            response.close();
271
272            job.onListMetadataFormats(metadataFormats);
273           
274            return true;
275        } finally {
276            if (response != null) {
277                response.release(job);
278            }
279        }
280    }
281
282    private boolean doListRecordsTask(HarvestJobImpl job) throws IOException,
283            XMLStreamException, HarvesterException {
284        Response response = null;
285        try {
286            String token = job.getResumptionToken();
287            uriBuilder.setVerb(VERB_LIST_RECORDS);
288            if (token == null) {
289                uriBuilder.addParameter(ARG_METADATA_PREFIX,
290                        job.getCurrentPrefix());
291                if (job.getFrom() != null) {
292                    uriBuilder.addParameter("from",
293                            formatDate(job.getGranularity(), job.getFrom()));
294                }
295                if (job.getUntil() != null) {
296                    uriBuilder.addParameter("until",
297                            formatDate(job.getGranularity(), job.getUntil()));
298                }
299                if (job.getSet() != null) {
300                    uriBuilder.addParameter("set", job.getSet());
301                }
302            } else {
303                uriBuilder.addParameter(ARG_RESUMPTION_TOKEN, token);
304                job.incResumptionCount();
305            }
306
307            response = execute(job, true);
308            if (response == null) {
309                return false;
310            }
311
312            String nextToken = null;
313
314            response.readStart(OAI_NS, "ListRecords", true);
315            header.reset();
316            boolean first = true;
317            while (response.readStart(OAI_NS, "record", first)) {
318                response.readStart(OAI_NS, "header", true, true);
319                String s = response.readAttributeValue(null, "status");
320                if (s != null) {
321                    if ("deleted".equals(s)) {
322                        header.setDeleted(true);
323                    } else {
324                        throw new XMLStreamException("attribute 'status' of " +
325                                "element 'record' must contain 'deleted'");
326                    }
327                }
328                response.consumeStart();
329                s = response.readContent(OAI_NS, "identifier", true);
330                header.setIdentifier(s);
331                s = response.readContent(OAI_NS, "datestamp", true);
332                header.setDatestamp(parseDate(job.getGranularity(), s));
333                while (response.readStart(OAI_NS, "setSpec", false)) {
334                    header.addSetSpec(response.readString(true));
335                    response.readEnd(OAI_NS, "setSpec", true);
336                }
337                response.readEnd(OAI_NS, "header");
338                job.onRecord(header);
339                if (response.readStart(OAI_NS, "metadata", true)) {
340                    response.consumeWhitespace();
341                    readerWrapper.reset(response.getXMLStreamReader());
342                    job.onRecordMetadata(header, readerWrapper);
343                    response.consumeWhitespace();
344                    response.readEnd(OAI_NS, "metadata");
345                }
346                while (response.readStart(OAI_NS, "about", false)) {
347                    response.consumeWhitespace();
348                    job.onRecordAbout(header, null);
349                    response.consumeWhitespace();
350                    response.readEnd(OAI_NS, "about", true);
351                } // while (about)
352                response.readEnd(OAI_NS, "record");
353                first = false;
354                header.reset();
355            } // while (record)
356
357            /*
358             * expect resumptionToken (possibly empty) when request was resumed
359             */
360            if (response.readStart(OAI_NS, "resumptionToken",
361                    (token != null), true)) {
362                // XXX: read attributes here!
363                response.consumeStart();
364
365                nextToken = response.readString(false);
366                if (nextToken != null) {
367                    job.setResumptionToken(nextToken);
368                }
369                response.readEnd(OAI_NS, "resumptionToken");
370            }
371            response.readEnd(OAI_NS, "ListRecords");
372           
373            response.close();
374
375            return (nextToken == null);
376        } finally {
377            if (response != null) {
378                response.release(job);
379            }
380        }
381    }
382
383    private Response execute(HarvestJobImpl job, boolean compress)
384            throws IOException, XMLStreamException, HarvesterException {
385        long now = System.currentTimeMillis();
386        job.incRequestCount();
387        final URI uri = uriBuilder.createURI(job.getRepositoryURI());
388        System.err.println("HTTP GET " + uri);
389        final HttpGet request = new HttpGet(uri);
390        if (compress) {
391            if ((job.getCompressionMask() &
392                    Repository.COMPRESSION_METHOD_GZIP) > 0) {
393                request.addHeader(HEADER_ACCEPT_ENCODING, "gzip");
394            }
395            if ((job.getCompressionMask() &
396                    Repository.COMPRESSION_METHOD_DEFLATE) > 0) {
397                request.addHeader(HEADER_ACCEPT_ENCODING, "deflate");
398            }
399            request.addHeader(HEADER_ACCEPT_ENCODING, "identity");
400        }
401        request.addHeader(HEADER_CONNECTION, "close");
402        // FIXME: get timeout values from somewhere?
403        HttpParams params = request.getParams();
404        params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000);
405        params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000);
406
407        final HttpResponse response = client.execute(request);
408        job.addToNetworkTime(System.currentTimeMillis() - now);
409        int status = response.getStatusLine().getStatusCode();
410        if (status == HttpStatus.SC_OK) {
411            delay = 0;
412            return new Response(job, response.getEntity(), harvester);
413        } else {
414            request.abort();
415            if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) {
416                System.err.println("-> " + status);
417                delay = parseTryAfter(response);
418            } else {
419                throw new HarvesterException(
420                        "provider returned unexpected HTTP status: " + status);
421            }
422        }
423        return null;
424    }
425
426    private long parseTryAfter(HttpResponse response) {
427        final Header s = response.getFirstHeader("Retry-After");
428        if (s != null) {
429            final String v = s.getValue();
430            try {
431                return Long.parseLong(v);
432            } catch (NumberFormatException e) {
433                /* IGNORE */
434            }
435        }
436        return 1000;
437    }
438
439    private Date parseDate(Repository.Granularity granularity, String s)
440            throws HarvesterException {
441        String[] patterns = null;
442        switch (granularity) {
443        case DAYS:
444            patterns = DATEFORMATS_DAYS;
445            break;
446        case SECONDS:
447            patterns = DATEFORMATS_FULL;
448        }
449        MutableDateTime date = new MutableDateTime(0);
450        for (int i = 0; i < patterns.length; i++) {
451            DateTimeFormatter fmt = DateTimeFormat
452                .forPattern(patterns[i])
453                .withZone(DateTimeZone.UTC);
454            if (fmt.parseInto(date, s, 0) == s.length()) {
455                if (patterns[i].equals(DATEFORMATS_DAYS[0])) {
456                    date.setTime(0, 0, 0, 0);
457                }
458                return date.toDate();
459            }
460        }
461        throw new HarvesterException("invalid date: " + s);
462    }
463
464    private String formatDate(Repository.Granularity granularity, Date date) {
465        String pattern = null;
466        switch (granularity) {
467        case DAYS:
468            pattern = DATEFORMATS_DAYS[0];
469            break;
470        case SECONDS:
471            pattern = DATEFORMATS_FULL[0];
472        }
473        DateTimeFormatter fmt =
474            DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.UTC);
475        return fmt.print(new DateTime(date));
476    }
477
478} // class HarvestWorker
Note: See TracBrowser for help on using the repository browser.