Changeset 3011 for OAIHarvester
- Timestamp:
- 06/10/13 19:50:23 (11 years ago)
- Location:
- OAIHarvester/trunk/OAIHarvester
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
OAIHarvester/trunk/OAIHarvester/pom.xml
r1905 r3011 61 61 <groupId>org.apache.httpcomponents</groupId> 62 62 <artifactId>httpclient</artifactId> 63 <version>4. 1.3</version>63 <version>4.2.5</version> 64 64 </dependency> 65 65 … … 67 67 <groupId>org.codehaus.woodstox</groupId> 68 68 <artifactId>woodstox-core-lgpl</artifactId> 69 <version>4. 1.3</version>69 <version>4.2.0</version> 70 70 </dependency> 71 71 … … 85 85 <dependency> 86 86 <groupId>org.slf4j</groupId> 87 <artifactId>slf4j- jdk14</artifactId>87 <artifactId>slf4j-log4j12</artifactId> 88 88 <version>${slf4j.version}</version> 89 89 <scope>test</scope> … … 98 98 <maven.compiler.target>1.6</maven.compiler.target> 99 99 <!-- versions of common dependencies --> 100 <slf4j.version>1. 6.4</slf4j.version>100 <slf4j.version>1.7.2</slf4j.version> 101 101 </properties> 102 102 -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/ProtocolError.java
r1128 r3011 15 15 public static Code fromString(String s) { 16 16 if ("cannotDisseminateFormat".equals(s)) { 17 return CANNOT_DISSERMINATE_FORMAT;17 return CANNOT_DISSERMINATE_FORMAT; 18 18 } else if ("idDoesNotExist".equals(s)) { 19 19 return ID_DOES_NOR_EXIST; … … 34 34 } 35 35 } 36 } 36 } // enum Code 37 37 38 38 private final Code code; 39 39 private final String message; 40 40 41 41 42 public ProtocolError(String code, String message) { 42 this.code = Code.fromString(code);43 this.code = Code.fromString(code); 43 44 this.message = message; 44 45 } 46 45 47 46 48 public Code getCode() { 47 49 return code; 48 50 } 49 51 52 50 53 public String getMessage() { 51 54 return message; 52 55 } 56 53 57 54 58 public String toString() { … … 56 60 } 57 61 62 public boolean isCode(Code code) { 63 if (code == null) { 64 throw new NullPointerException("code == null"); 65 } 66 return code.equals(this.code); 67 } 68 58 69 } // class ProtocolError -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/AbstractHarvester.java
r1903 r3011 7 7 import java.util.List; 8 8 9 import javax.xml.namespace.QName; 9 10 import javax.xml.stream.XMLInputFactory; 10 11 import javax.xml.stream.XMLStreamException; 11 import javax.xml.stream.XMLStreamReader;12 12 13 13 import org.codehaus.stax2.XMLInputFactory2; … … 22 22 import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser; 23 23 24 24 25 abstract class AbstractHarvester implements Harvester { 25 26 private static final int DEFAULT_MAX_NETWORK_RETRY_COUNT = 5; 26 27 protected int maxNetworkRetryCount = DEFAULT_MAX_NETWORK_RETRY_COUNT; 27 private final XMLInputFactory factory;28 //private final XMLValidationSchema schema;28 private final XMLInputFactory2 factory; 29 // private final XMLValidationSchema schema; 29 30 private List<DescriptionParser> descriptionParsers = 30 new ArrayList<DescriptionParser>(); 31 new ArrayList<DescriptionParser>(); 32 31 33 32 34 protected AbstractHarvester() { 33 factory = XMLInputFactory2.newInstance();35 factory = (XMLInputFactory2) XMLInputFactory.newInstance(); 34 36 35 37 // Stax settings … … 54 56 } 55 57 58 59 @Override 56 60 public final void registerDescriptionParser(DescriptionParser parser) 57 61 throws HarvesterException { … … 61 65 if (findDescriptionParser(parser.getNamespaceURI(), 62 66 parser.getLocalName()) != null) { 63 throw new HarvesterException("description parser for ' {" +64 parser.getNamespaceURI() + "}" + parser.getLocalName() +65 "' was already registered");67 throw new HarvesterException("description parser for '" + 68 new QName(parser.getNamespaceURI(), parser.getLocalName()) + 69 "' was already registered"); 66 70 } 67 71 if (descriptionParsers == null) { … … 70 74 descriptionParsers.add(parser); 71 75 } 72 76 77 78 @Override 73 79 public abstract HarvestJob createJob(URI repositoryURI, 74 80 HarvestHandler handler) throws HarvesterException; 75 81 82 83 @Override 76 84 public final HarvestJob createJob(URI repositoryURI) 77 85 throws HarvesterException { 78 86 return createJob(repositoryURI, null); 79 87 } 80 88 89 81 90 @Override 82 91 public final HarvestJob createJob(String repositoryURI, … … 89 98 } 90 99 100 91 101 @Override 92 102 public final HarvestJob createJob(String repositoryURI) … … 95 105 } 96 106 107 97 108 @Override 98 109 public final HarvestJob findJob(long id) throws HarvesterException { 99 110 return doFindJob(id); 100 111 } 112 101 113 102 114 @Override … … 109 121 } 110 122 123 111 124 @Override 112 125 public final int getMaxNetworkRetryCount() { 113 126 return maxNetworkRetryCount; 114 127 } 115 128 129 116 130 @Override 117 131 public final void setMaxNetworkRetryCount(int maxNetworkRetryCount) { … … 123 137 this.maxNetworkRetryCount = maxNetworkRetryCount; 124 138 } 139 125 140 126 141 DescriptionParser findDescriptionParser(String namespaceURI, … … 137 152 } 138 153 154 139 155 abstract void doRunJob(HarvestJobImpl job) throws HarvesterException; 156 140 157 141 158 abstract void doCancelJob(HarvestJobImpl job) throws HarvesterException; 142 159 143 protected abstract HarvestJob doFindJob(long id)144 throws HarvesterException;145 160 146 protected final XMLStreamReader createReader(InputStream in) 161 protected abstract HarvestJob doFindJob(long id) throws HarvesterException; 162 163 164 protected final XMLStreamReader2 createReader(InputStream in) 147 165 throws XMLStreamException { 148 166 XMLStreamReader2 reader = 149 (XMLStreamReader2) factory.createXMLStreamReader(in);167 (XMLStreamReader2) factory.createXMLStreamReader(in); 150 168 // reader.validateAgainst(schema); 151 169 return reader; -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestJobImpl.java
r1163 r3011 23 23 import eu.clarin.cmdi.oai.harvester.Repository.Granularity; 24 24 25 25 26 final class HarvestJobImpl implements HarvestJob, Delayed { 26 27 public enum Task { 27 IDENTIFY_REPOSITORY, 28 ENUMERATE_METADATA_FORMATS, 29 HARVEST_RECORDS 30 } 28 IDENTIFY_REPOSITORY, ENUMERATE_METADATA_FORMATS, HARVEST_RECORDS 29 } 30 31 31 private static final class StatisticsImpl implements Statistics { 32 32 private final String prefix; … … 36 36 private long bytedTransferred = 0; 37 37 private Date latestDatestamp = null; 38 38 39 39 40 private StatisticsImpl(String prefix) { 40 41 this.prefix = prefix; 41 42 } 43 42 44 43 45 @Override … … 46 48 } 47 49 50 48 51 @Override 49 52 public long getRecordCount() { … … 51 54 } 52 55 56 53 57 private void incRecordCount() { 54 58 this.recordCount++; 55 59 } 60 56 61 57 62 @Override … … 60 65 } 61 66 67 62 68 private void incRequestCount() { 63 69 this.requestCount++; 64 70 } 71 65 72 66 73 @Override … … 69 76 } 70 77 78 71 79 private void incResumptionCount() { 72 80 this.resumptionCount++; 73 81 } 82 74 83 75 84 @Override … … 77 86 return bytedTransferred; 78 87 } 79 88 89 80 90 private void incBytesTransferred(long bytes) { 81 91 this.bytedTransferred += bytes; 82 92 } 93 83 94 84 95 @Override … … 86 97 return latestDatestamp; 87 98 } 88 99 100 89 101 private void updateLatestDatestmp(Date latestDatestamp) { 90 102 if (this.latestDatestamp == null) { … … 97 109 } 98 110 } // inner class Statistics 111 99 112 private final AbstractHarvester harvester; 100 113 private final long id; … … 131 144 private String resumptionToken = null; 132 145 146 133 147 HarvestJobImpl(AbstractHarvester harvester, long id, URI repositoryURI, 134 148 HarvestHandler handler) { 135 this.harvester = harvester;136 this.id = id;149 this.harvester = harvester; 150 this.id = id; 137 151 this.repositoryURI = repositoryURI; 138 this.handler = handler; 139 } 152 this.handler = handler; 153 } 154 140 155 141 156 @Override … … 144 159 } 145 160 161 146 162 @Override 147 163 public State getState() { … … 149 165 } 150 166 167 151 168 @Override 152 169 public boolean isRunning() { … … 154 171 } 155 172 173 156 174 @Override 157 175 public long getTotelRecordCount() { … … 159 177 } 160 178 179 161 180 @Override 162 181 public int getTotelRequestCount() { … … 164 183 } 165 184 185 166 186 @Override 167 187 public long getTotalBytesTransferred() { … … 169 189 } 170 190 191 171 192 @Override 172 193 public long getTotalTime() { … … 174 195 } 175 196 197 176 198 @Override 177 199 public long getNetworkTime() { 178 200 return timeNetwork; 179 201 } 180 202 203 181 204 @Override 182 205 public long getWaitTime() { … … 184 207 } 185 208 209 186 210 @Override 187 211 public long getProcessingTime() { … … 189 213 } 190 214 191 215 192 216 @Override 193 217 public HarvestHandler getHarvestHandler() { … … 195 219 } 196 220 221 197 222 @Override 198 223 public Date getFrom() { 199 224 return from; 200 225 } 201 226 227 202 228 @Override 203 229 public void setFrom(Date from) { … … 207 233 this.from = from; 208 234 } 209 235 236 210 237 @Override 211 238 public Date getUntil() { 212 239 return until; 213 240 } 214 241 242 215 243 @Override 216 244 public void setUntil(Date until) { … … 221 249 } 222 250 251 223 252 @Override 224 253 public String getSet() { 225 254 return set; 226 255 } 256 227 257 228 258 @Override … … 238 268 } 239 269 270 240 271 @Override 241 272 public List<String> getMetadataPrefixes() { 242 273 return metadataPrefixes; 243 274 } 244 275 276 245 277 @Override 246 278 public void setMetadataPrefixes(List<String> metadataPrefixes) { … … 254 286 } 255 287 } 256 288 289 257 290 @Override 258 291 public Statistics getStatistics(String prefix) { … … 271 304 } 272 305 306 273 307 @Override 274 308 public List<Statistics> getStatistics() { … … 283 317 } 284 318 319 285 320 @Override 286 321 public void run() throws HarvesterException { 287 322 harvester.doRunJob(this); 288 323 } 324 289 325 290 326 @Override … … 294 330 } 295 331 332 296 333 @Override 297 334 public long getDelay(TimeUnit unit) { … … 299 336 TimeUnit.MILLISECONDS); 300 337 } 338 339 340 @Override 341 public boolean equals(Object obj) { 342 if (obj == null) { 343 return false; 344 } 345 if (obj == this) { 346 return true; 347 } 348 if (!(obj instanceof HarvestJobImpl)) { 349 return false; 350 } 351 final HarvestJobImpl rhs = (HarvestJobImpl) obj; 352 return (this.id == rhs.id) && this.harvester.equals(rhs.harvester); 353 } 354 355 @Override 356 public int hashCode() { 357 return super.hashCode(); 358 } 359 301 360 302 361 @Override … … 312 371 } 313 372 } 373 314 374 315 375 @Override … … 327 387 } 328 388 389 329 390 public URI getRepositoryURI() { 330 391 return repositoryURI; 331 392 } 393 332 394 333 395 void setState(State state) { … … 341 403 } 342 404 405 343 406 void setRepositoryName(String repositoryName) { 344 407 this.repositoryName = repositoryName; 345 408 } 346 409 410 347 411 void setBaseURL(String baseURL) { 348 412 this.baseURL = baseURL; 349 413 } 350 414 415 351 416 void setProtocolVersion(String protocolVersion) { 352 417 this.protocolVersion = protocolVersion; 353 418 } 354 419 420 355 421 void setAdminEmail(List<String> adminEmail) { 356 422 this.adminEmail = adminEmail; 357 423 } 358 424 425 359 426 void setEarliestTimestamp(Date earliestTimestamp) { 360 427 this.earliestTimestamp = earliestTimestamp; 361 428 } 362 429 430 363 431 void setDeletedNotion(DeletedNotion deletedNotion) { 364 432 this.deletedNotion = deletedNotion; 365 433 } 366 434 435 367 436 void setGranularity(Granularity granularity) { 368 437 this.granularity = granularity; 369 438 } 370 439 440 371 441 Granularity getGranularity() { 372 442 return granularity; 373 443 } 374 444 445 375 446 void setCompressionMask(int compressionMask) { 376 447 this.compressionMask = compressionMask; 377 448 } 378 449 450 379 451 int getCompressionMask() { 380 452 return compressionMask; 381 453 } 382 454 455 383 456 void setDescriptions(List<Description> descriptions) { 384 457 this.descriptions = descriptions; 385 458 } 459 386 460 387 461 void incRequestCount() { … … 393 467 } 394 468 469 395 470 void incResumptionCount() { 396 471 final StatisticsImpl stats = getCurrentStatistics(); … … 400 475 } 401 476 477 402 478 void addToNetworkTime(long delta) { 403 479 timeNetwork += delta; 404 480 } 481 405 482 406 483 void finishRequest(long bytesTransferred, long timeProcessingDelta) { … … 413 490 } 414 491 492 415 493 boolean isState(State state) { 416 494 return this.state == state; 417 495 } 418 496 497 419 498 Task getTask() { 420 499 return task; 421 500 } 422 501 502 423 503 void setTask(Task task) { 424 504 this.task = task; 425 505 } 506 426 507 427 508 String getCurrentPrefix() { … … 441 522 } 442 523 524 443 525 boolean removeCurrentPrefix() { 444 526 prefixWorklist.remove(0); … … 446 528 } 447 529 530 448 531 String getResumptionToken() { 449 532 return resumptionToken; 450 533 } 451 534 535 452 536 void setResumptionToken(String resumptionToken) { 453 537 this.resumptionToken = resumptionToken; 454 538 } 455 539 540 456 541 boolean isHarvestingNewPrefix() { 457 542 return (resumptionToken == null) && (networkRetryCount == 0); 458 543 } 459 544 545 460 546 int incNetworkRetryCount() { 461 547 return ++networkRetryCount; 462 548 } 463 549 550 464 551 int getNetworkRetryCount() { 465 552 return networkRetryCount; 466 553 } 554 467 555 468 556 void resetNetworkRetryCount() { … … 471 559 } 472 560 561 473 562 void setNetworkRequestDelay(long delay) { 474 563 this.timeDelayed += delay; 475 564 this.delayUntil = System.currentTimeMillis() + delay; 476 565 } 566 477 567 478 568 void onStartListingRecords() { … … 488 578 } 489 579 580 490 581 void onFinishListingRecords() { 491 582 Date latestDatestamp = null; … … 501 592 resumptionToken = null; 502 593 } 594 503 595 504 596 void onRecord(Header header) { … … 514 606 } 515 607 608 516 609 void onRecordMetadata(Header header, XMLStreamReader reader) { 517 610 if (handler != null) { … … 520 613 } 521 614 615 522 616 void onRecordAbout(Header header, XMLStreamReader reader) { 523 617 if (handler != null) { … … 526 620 } 527 621 622 528 623 void onIdentify() { 529 624 if (handler != null) { … … 533 628 return repositoryName; 534 629 } 535 630 631 536 632 @Override 537 633 public String getProtocolVersion() { 538 634 return protocolVersion; 539 635 } 540 636 637 541 638 @Override 542 639 public Granularity getGranularity() { 543 640 return granularity; 544 641 } 545 642 643 546 644 @Override 547 645 public Date getEarliestTimestamp() { 548 646 return earliestTimestamp; 549 647 } 550 648 649 551 650 @Override 552 651 public DeletedNotion getDeletedNotion() { 553 652 return deletedNotion; 554 653 } 555 654 655 556 656 @Override 557 657 public int getCompressionMask() { 558 658 return compressionMask; 559 659 } 560 660 661 561 662 @Override 562 663 public String getBaseURL() { 563 664 return baseURL; 564 665 } 565 666 667 566 668 @Override 567 669 public List<String> getAdminEmail() { 568 return (adminEmail != null) ? 569 Collections.unmodifiableList(adminEmail) : null; 570 } 670 return (adminEmail != null) ? Collections 671 .unmodifiableList(adminEmail) : null; 672 } 673 571 674 572 675 @Override 573 676 public List<Description> getDescriptions() { 574 return (descriptions != null) ? 575 Collections.unmodifiableList(descriptions) : null;677 return (descriptions != null) ? Collections 678 .unmodifiableList(descriptions) : null; 576 679 } 577 680 }; … … 579 682 } 580 683 } 684 581 685 582 686 void onListMetadataFormats(List<MetadataFormat> metadataFormats) { … … 588 692 } 589 693 694 590 695 InputStream wrap(InputStream stream) throws IOException { 591 696 if (handler != null) { … … 596 701 } 597 702 703 598 704 private StatisticsImpl getCurrentStatistics() { 599 705 if ((statistics != null) && !statistics.isEmpty()) { -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java
r1163 r3011 7 7 import java.util.List; 8 8 9 import javax.xml.namespace.QName; 9 10 import javax.xml.stream.XMLStreamException; 10 11 … … 12 13 import org.apache.http.HttpResponse; 13 14 import org.apache.http.HttpStatus; 15 import org.apache.http.client.HttpClient; 14 16 import org.apache.http.client.methods.HttpGet; 17 import org.apache.http.client.utils.HttpClientUtils; 15 18 import org.apache.http.impl.client.DefaultHttpClient; 16 19 import org.apache.http.impl.cookie.DateParseException; … … 19 22 import org.apache.http.params.CoreProtocolPNames; 20 23 import org.apache.http.params.HttpParams; 24 import org.apache.http.util.EntityUtils; 21 25 import org.joda.time.DateTime; 22 26 import org.joda.time.DateTimeZone; … … 24 28 import org.joda.time.format.DateTimeFormat; 25 29 import org.joda.time.format.DateTimeFormatter; 30 import org.slf4j.Logger; 31 import org.slf4j.LoggerFactory; 26 32 27 33 import eu.clarin.cmdi.oai.harvester.Description; 28 34 import eu.clarin.cmdi.oai.harvester.HarvestJob.State; 29 35 import eu.clarin.cmdi.oai.harvester.HarvesterException; 36 import eu.clarin.cmdi.oai.harvester.HarvesterProtocolErrorException; 30 37 import eu.clarin.cmdi.oai.harvester.MetadataFormat; 38 import eu.clarin.cmdi.oai.harvester.ProtocolError; 31 39 import eu.clarin.cmdi.oai.harvester.Repository; 32 40 import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion; … … 34 42 import eu.clarin.cmdi.oai.harvester.ext.DescriptionParser; 35 43 import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task; 44 36 45 37 46 public class HarvestWorker { … … 48 57 private static final String COMPRESSION_GZIP = "gzip"; 49 58 private static final String VERB_IDENTIFY = "Identify"; 50 private static final String VERB_LIST_METADATAFORMATS = 51 "ListMetadataFormats"; 59 private static final String VERB_LIST_METADATAFORMATS = "ListMetadataFormats"; 52 60 private static final String VERB_LIST_RECORDS = "ListRecords"; 53 61 private static final String HEADER_ACCEPT_ENCODING = "Accept-Encoding"; 54 62 private static final String HEADER_CONNECTION = "Connection"; 55 private static final String[] DATEFORMATS_DAYS = 56 { "yyyy'-'MM'-'dd" }; 57 private static final String[] DATEFORMATS_FULL = 58 { "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'", "yyyy'-'MM'-'dd" }; 63 private static final String DATEFORMAT_SECONDS = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'Z'"; 64 private static final String DATEFORMAT_DAYS = "yyyy'-'MM'-'dd"; 65 private static final String[] DATEFORMATS = 66 { DATEFORMAT_SECONDS, DATEFORMAT_DAYS }; 67 private static final Logger logger = 68 LoggerFactory.getLogger(HarvestWorker.class); 59 69 private final AbstractHarvester harvester; 60 private final DefaultHttpClient client;70 private final HttpClient client; 61 71 private final RequestUriBuilder uriBuilder; 62 72 private final HeaderImpl header; 63 73 private final XmlStreamReaderProxy readerWrapper; 64 74 private long delay; 75 65 76 66 77 HarvestWorker(AbstractHarvester harvester) { … … 69 80 client = new DefaultHttpClient(); 70 81 client.getParams().setParameter(CoreProtocolPNames.USER_AGENT, 71 82 "eu.clarin.cmdi.oai.Harvester/0.0.1"); 72 83 // request stuff 73 84 uriBuilder = new RequestUriBuilder(); 74 85 // other stuff 75 this.header = new HeaderImpl();86 this.header = new HeaderImpl(); 76 87 this.readerWrapper = new XmlStreamReaderProxy(); 77 88 } 89 78 90 79 91 long process(HarvestJobImpl job, int maxNetworkTries) … … 115 127 } else if ((job.getResumptionToken() != null) && (delay == 0)) { 116 128 /* 117 * special case: request successful, but list was split 118 * dieto OAI flow control129 * special case: request successful, but list was split due 130 * to OAI flow control 119 131 */ 120 132 ok = true; … … 151 163 } 152 164 165 153 166 private boolean doIdentifyTask(HarvestJobImpl job) throws IOException, 154 167 XMLStreamException, HarvesterException { … … 171 184 s = response.readContent(OAI_NS, "protocolVersion", true); 172 185 if (!PROTOCOL_VERSION.equals(s)) { 173 throw new HarvesterException( "unsupported protocol version: " +174 s);186 throw new HarvesterException( 187 "unsupported protocol version: " + s); 175 188 } 176 189 job.setProtocolVersion(PROTOCOL_VERSION); … … 178 191 List<String> adminEmail = null; 179 192 do { 180 s = response.readContent(OAI_NS, "adminEmail",181 (adminEmail == null));193 s = response.readContent(OAI_NS, 194 "adminEmail", (adminEmail == null)); 182 195 if (s != null) { 183 196 if (adminEmail == null) { … … 190 203 191 204 /* 192 * defer setting of earliestDatestamp in job until we know193 * the datestamp granularity of the repository205 * defer setting of earliestDatestamp in job until we know the 206 * datestamp granularity of the repository 194 207 */ 195 208 final String earliestDatestamp = 196 response.readContent(OAI_NS, "earliestDatestamp", true);209 response.readContent(OAI_NS, "earliestDatestamp", true); 197 210 198 211 s = response.readContent(OAI_NS, "deletedRecord", true); … … 242 255 namespaceURI, localName); 243 256 if (parser != null) { 244 Description desc = 245 parser.parseDescription(response.getXMLStreamReader());257 Description desc = parser.parseDescription( 258 response.getXMLStreamReader()); 246 259 if (desc != null) { 247 260 if (descriptions == null) { … … 251 264 } 252 265 } else { 253 System.err.println("skipping DESC " + namespaceURI + ", " +254 localName);266 logger.info("skipping <description> for {}", new QName( 267 namespaceURI, localName)); 255 268 } 256 269 response.readEnd(OAI_NS, "description", true); … … 259 272 response.readEnd(OAI_NS, "Identify"); 260 273 response.close(); 261 274 262 275 job.onIdentify(); 263 276 … … 269 282 } 270 283 } 284 271 285 272 286 private boolean doListMetadataFormatsTask(HarvestJobImpl job) … … 282 296 response.readStart(OAI_NS, "ListMetadataFormats", true); 283 297 List<MetadataFormat> metadataFormats = 284 new ArrayList<MetadataFormat>();298 new ArrayList<MetadataFormat>(); 285 299 while (response.readStart(OAI_NS, "metadataFormat", 286 300 metadataFormats.isEmpty())) { 287 301 final String prefix = 288 response.readContent(OAI_NS, "metadataPrefix", true);302 response.readContent(OAI_NS, "metadataPrefix", true); 289 303 final String schema = 290 response.readContent(OAI_NS, "schema", true);304 response.readContent(OAI_NS, "schema", true); 291 305 final String namespace = 292 response.readContent(OAI_NS, "metadataNamespace", true);306 response.readContent(OAI_NS, "metadataNamespace", true); 293 307 metadataFormats.add( 294 308 new MetadataFormatImpl(prefix, schema, namespace)); … … 299 313 300 314 job.onListMetadataFormats(metadataFormats); 301 315 302 316 return true; 303 317 } finally { … … 307 321 } 308 322 } 323 309 324 310 325 private boolean doListRecordsTask(HarvestJobImpl job) throws IOException, … … 344 359 boolean first = true; 345 360 while (response.readStart(OAI_NS, "record", first)) { 361 boolean deleted = false; 362 346 363 response.readStart(OAI_NS, "header", true, true); 347 364 String s = response.readAttributeValue(null, "status"); … … 349 366 if ("deleted".equals(s)) { 350 367 header.setDeleted(true); 368 deleted = true; 351 369 } else { 352 throw new XMLStreamException("attribute 'status' of " + 353 "element 'record' must contain 'deleted'"); 370 throw new XMLStreamException("attribute 'status' of " 371 + "element 'record' must contain 'deleted'", 372 response.getLocation()); 354 373 } 355 374 } … … 365 384 response.readEnd(OAI_NS, "header"); 366 385 job.onRecord(header); 367 if (response.readStart(OAI_NS, "metadata", true)) { 368 response.consumeWhitespace(); 369 readerWrapper.reset(response.getXMLStreamReader()); 370 job.onRecordMetadata(header, readerWrapper); 371 response.consumeWhitespace(); 372 response.readEnd(OAI_NS, "metadata"); 386 /* 387 * only parse metadata from non-deleted records 388 */ 389 if (!deleted) { 390 if (response.readStart(OAI_NS, "metadata", true)) { 391 response.consumeWhitespace(); 392 readerWrapper.reset(response.getXMLStreamReader()); 393 job.onRecordMetadata(header, readerWrapper); 394 response.consumeWhitespace(); 395 response.readEnd(OAI_NS, "metadata"); 396 } 373 397 } 374 398 while (response.readStart(OAI_NS, "about", false)) { … … 398 422 } 399 423 response.readEnd(OAI_NS, "ListRecords"); 400 424 401 425 response.close(); 402 426 403 427 return (nextToken == null); 428 } catch (HarvesterProtocolErrorException e) { 429 /* 430 * XXX: handle NO_RECORDS_MATCH protocol error 431 */ 432 List<ProtocolError> errors = e.getErrors(); 433 if ((errors != null) && !errors.isEmpty()) { 434 for (ProtocolError error : errors) { 435 if (error.isCode(ProtocolError.Code.NO_RECORDS_MATCH)) { 436 logger.debug("no records match prefix '{}'", 437 job.getCurrentPrefix()); 438 return true; 439 } 440 } 441 } 442 throw e; 404 443 } finally { 405 444 if (response != null) { … … 409 448 } 410 449 450 411 451 private Response execute(HarvestJobImpl job, boolean compress) 412 452 throws IOException, XMLStreamException, HarvesterException { 413 long now = System.currentTimeMillis();453 final long now = System.currentTimeMillis(); 414 454 job.incRequestCount(); 415 455 final URI uri = uriBuilder.createURI(job.getRepositoryURI()); 416 System.err.println("HTTP GET " +uri);456 logger.debug("Performing HTTP GET request to {}", uri); 417 457 final HttpGet request = new HttpGet(uri); 418 458 if (compress) { 419 if ((job.getCompressionMask() &420 459 final int mask = job.getCompressionMask(); 460 if ((mask & Repository.COMPRESSION_METHOD_GZIP) > 0) { 421 461 request.addHeader(HEADER_ACCEPT_ENCODING, "gzip"); 422 462 } 423 if ((job.getCompressionMask() & 424 Repository.COMPRESSION_METHOD_DEFLATE) > 0) { 463 if ((mask & Repository.COMPRESSION_METHOD_DEFLATE) > 0) { 425 464 request.addHeader(HEADER_ACCEPT_ENCODING, "deflate"); 426 465 } … … 434 473 435 474 final HttpResponse response = client.execute(request); 436 job.addToNetworkTime(System.currentTimeMillis() - now);437 int status = response.getStatusLine().getStatusCode();438 if (status == HttpStatus.SC_OK) {439 delay = 0;440 return new Response(job, response.getEntity(), harvester);441 } else {442 request.abort();443 if (status == HttpStatus.SC_ SERVICE_UNAVAILABLE) {444 System.err.println("-> " + status);445 delay = parseTryAfter(response);475 /* 476 * FIXME: the following code need serious re-factoring 477 */ 478 boolean close = false; 479 try { 480 job.addToNetworkTime(System.currentTimeMillis() - now); 481 final int status = response.getStatusLine().getStatusCode(); 482 if (status == HttpStatus.SC_OK) { 483 delay = 0; 484 return new Response(job, response, harvester); 446 485 } else { 447 throw new HarvesterException( 448 "provider returned unexpected HTTP status: " + status); 449 } 450 } 451 return null; 452 } 486 if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) { 487 delay = parseTryAfter(response); 488 logger.debug("got service unavailable status, retrying " + 489 "after {} seconds", delay); 490 close = true; 491 return null; 492 } else { 493 throw new HarvesterException( 494 "provider returned unexpected HTTP status: " + 495 status); 496 } 497 } 498 } catch (IOException e) { 499 close = true; 500 throw e; 501 } catch (XMLStreamException e) { 502 close = true; 503 throw e; 504 } catch (HarvesterException e) { 505 close = true; 506 throw e; 507 } finally { 508 if (close) { 509 /* 510 * try hard to release HTTP client resources ... 511 */ 512 try { 513 EntityUtils.consume(response.getEntity()); 514 } catch (IOException ex) { 515 /* IGNORE */ 516 } 517 518 /* make sure to release allocated resources */ 519 HttpClientUtils.closeQuietly(response); 520 521 if (request != null) { 522 request.abort(); 523 } 524 } 525 } 526 } 527 453 528 454 529 private long parseTryAfter(HttpResponse response) { … … 474 549 } 475 550 476 private Date parseDate(Repository.Granularity granularity, String s) 551 552 private static Date parseDate(Repository.Granularity granularity, String s) 477 553 throws HarvesterException { 478 String[] patterns = null; 554 final MutableDateTime date = new MutableDateTime(0); 555 for (int i = 0; i < DATEFORMATS.length; i++) { 556 DateTimeFormatter fmt = DateTimeFormat 557 .forPattern(DATEFORMATS[i]) 558 .withZone(DateTimeZone.UTC); 559 if (fmt.parseInto(date, s, 0) == s.length()) { 560 if (DATEFORMATS[i].equals(DATEFORMAT_SECONDS) && 561 (granularity == Repository.Granularity.DAYS)) { 562 logger.warn("repository announced DAYS granularity but " + 563 "provided timestamp with SECONDS granularity"); 564 } 565 if (DATEFORMATS[i].equals(DATEFORMAT_DAYS)) { 566 date.setTime(0, 0, 0, 0); 567 } 568 return date.toDate(); 569 } 570 } 571 throw new HarvesterException("invalid date: " + s); 572 } 573 574 575 private static String formatDate(Repository.Granularity granularity, 576 Date date) throws HarvesterException { 577 DateTimeFormatter fmt = null; 479 578 switch (granularity) { 480 579 case DAYS: 481 patterns = DATEFORMATS_DAYS; 580 fmt = DateTimeFormat.forPattern(DATEFORMAT_DAYS) 581 .withZone(DateTimeZone.UTC); 482 582 break; 483 583 case SECONDS: 484 patterns = DATEFORMATS_FULL; 485 } 486 MutableDateTime date = new MutableDateTime(0); 487 for (int i = 0; i < patterns.length; i++) { 488 DateTimeFormatter fmt = DateTimeFormat 489 .forPattern(patterns[i]) 584 fmt = DateTimeFormat.forPattern(DATEFORMAT_SECONDS) 490 585 .withZone(DateTimeZone.UTC); 491 if (fmt.parseInto(date, s, 0) == s.length()) {492 if (patterns[i].equals(DATEFORMATS_DAYS[0])) {493 date.setTime(0, 0, 0, 0);494 }495 return date.toDate();496 }497 }498 throw new HarvesterException("invalid date: " + s);499 }500 501 private String formatDate(Repository.Granularity granularity, Date date) {502 String pattern = null;503 switch (granularity) {504 case DAYS:505 pattern = DATEFORMATS_DAYS[0];506 586 break; 507 case SECONDS: 508 pattern = DATEFORMATS_FULL[0]; 509 } 510 DateTimeFormatter fmt = 511 DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.UTC); 587 default: 588 /* cannot happen, but silence FindBugs warning */ 589 throw new HarvesterException("invalid granularity: " + granularity); 590 } 512 591 return fmt.print(new DateTime(date)); 513 592 } -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/ParallelHarvester.java
r1148 r3011 8 8 import java.util.concurrent.atomic.AtomicLong; 9 9 10 import org.slf4j.Logger; 11 import org.slf4j.LoggerFactory; 12 10 13 import eu.clarin.cmdi.oai.harvester.HarvestHandler; 11 14 import eu.clarin.cmdi.oai.harvester.HarvestJob; … … 13 16 import eu.clarin.cmdi.oai.harvester.HarvesterException; 14 17 18 15 19 public class ParallelHarvester extends AbstractHarvester { 16 20 private final class WorkerThread extends Thread { 17 private final HarvestWorker worker = 18 new HarvestWorker(ParallelHarvester.this);21 private final HarvestWorker worker = new HarvestWorker( 22 ParallelHarvester.this); 19 23 private AtomicBoolean loop = new AtomicBoolean(true); 20 24 25 21 26 @Override 22 27 public void run() { 23 28 while (loop.get()) { 24 29 try { 25 System.err.println(getId() + " waiting ...");26 HarvestJobImpl job = queue.take();27 System.err.println(getId() + " got job " +job.getId());30 logger.debug("[{}] waiting ...", getId()); 31 final HarvestJobImpl job = queue.take(); 32 logger.debug("[{}] got job {}", getId(), job.getId()); 28 33 try { 29 34 while (true) { 30 System.err.println(getId() + " process job " +31 job.getId());35 logger.debug("[{}] processing job {} ...", 36 getId(), job.getId()); 32 37 worker.process(job, maxNetworkRetryCount); 33 38 if (job.isRunning()) { 34 39 if (job.getDelay(TimeUnit.MILLISECONDS) > 0) { 35 40 queue.offer(job); 36 System.err.println(getId() +37 " requeuing job " +job.getId());41 logger.debug("[{}] requeuing job {} ...", 42 getId(), job.getId()); 38 43 } else { 39 44 continue; … … 49 54 } 50 55 } // while 51 System.err.println(getId() + " ... exit");56 logger.debug("[{}] exiting ...", getId()); 52 57 } 58 53 59 54 60 void shutdown() { … … 57 63 } 58 64 } 65 private static final Logger logger = 66 LoggerFactory.getLogger(ParallelHarvester.class); 67 private final HashSet<WorkerThread> workers = 68 new HashSet<WorkerThread>(); 69 private final DelayQueue<HarvestJobImpl> queue = 70 new DelayQueue<HarvestJobImpl>(); 71 private final AtomicLong nextId = new AtomicLong(0); 59 72 60 private final HashSet<WorkerThread> workers = new HashSet<WorkerThread>();61 private final DelayQueue<HarvestJobImpl> queue = new DelayQueue<HarvestJobImpl>();62 private AtomicLong nextId = new AtomicLong();63 73 64 74 private ParallelHarvester() { … … 68 78 } 69 79 80 70 81 @Override 71 82 public HarvestJob createJob(URI repositoryURI, HarvestHandler handler) … … 74 85 repositoryURI, handler); 75 86 } 87 76 88 77 89 public static Harvester newInstance() { … … 83 95 } 84 96 97 85 98 @Override 86 99 public void shutdown() { … … 90 103 w.join(); 91 104 } catch (InterruptedException e) { 92 System.err.println("join fail!");105 logger.error("interrupted while shutting down"); 93 106 } 94 107 } … … 102 115 } 103 116 117 104 118 @Override 105 119 void doCancelJob(HarvestJobImpl job) throws HarvesterException { 106 120 queue.remove(job); 107 121 } 122 108 123 109 124 @Override -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/Response.java
r1904 r3011 10 10 import java.util.zip.InflaterInputStream; 11 11 12 import javax.xml.namespace.NamespaceContext; 12 13 import javax.xml.namespace.QName; 14 import javax.xml.stream.Location; 13 15 import javax.xml.stream.XMLStreamConstants; 14 16 import javax.xml.stream.XMLStreamException; 15 17 import javax.xml.stream.XMLStreamReader; 18 import javax.xml.stream.XMLStreamWriter; 16 19 17 20 import org.apache.http.HttpEntity; 21 import org.apache.http.HttpResponse; 22 import org.apache.http.client.utils.HttpClientUtils; 18 23 import org.apache.http.util.EntityUtils; 24 import org.codehaus.stax2.XMLStreamReader2; 25 import org.slf4j.Logger; 26 import org.slf4j.LoggerFactory; 19 27 20 28 import eu.clarin.cmdi.oai.harvester.HarvesterException; … … 22 30 import eu.clarin.cmdi.oai.harvester.ProtocolError; 23 31 32 24 33 final class Response { 25 34 private static final class CountingInputStream extends FilterInputStream { 26 35 private long count = 0; 27 36 37 28 38 private CountingInputStream(final InputStream stream) { 29 39 super(stream); 30 40 } 41 31 42 32 43 @Override … … 36 47 return result; 37 48 } 49 38 50 39 51 @Override … … 45 57 } 46 58 59 47 60 @Override 48 61 public int read(byte[] buffer) throws IOException { … … 52 65 } 53 66 67 54 68 @Override 55 69 public long skip(long n) throws IOException { … … 58 72 return result; 59 73 } 60 74 75 61 76 public long getByteCount() { 62 77 return count; … … 64 79 65 80 } // class CountingInputStream 66 67 private static final String OAI_NS = 68 "http://www.openarchives.org/OAI/2.0/"; 69 private final HttpEntity entity; 81 private static final String OAI_NS = "http://www.openarchives.org/OAI/2.0/"; 82 private static final String ENCODING_GZIP = "gzip"; 83 private static final String ENCODING_DEFLATE = "defalte"; 84 private static final Logger logger = 85 LoggerFactory.getLogger(Response.class); 86 private final HttpResponse response; 70 87 private final long now = System.currentTimeMillis(); 71 private final CountingInputStream stream; 72 private final XMLStreamReader reader; 73 74 Response(HarvestJobImpl job, HttpEntity entity, AbstractHarvester harvester) 88 private final CountingInputStream stream; 89 private final XMLStreamReader2 reader; 90 91 92 Response(HarvestJobImpl job, HttpResponse response, AbstractHarvester harvester) 75 93 throws IOException, XMLStreamException, HarvesterException { 76 this.entity = entity; 77 94 if (job == null) { 95 throw new NullPointerException("job == null"); 96 } 97 if (response == null) { 98 throw new NullPointerException("response == null"); 99 } 100 if (harvester == null) { 101 throw new NullPointerException("harvester == null"); 102 } 103 this.response = response; 104 105 final HttpEntity entity = response.getEntity(); 78 106 if (entity.getContentType() != null) { 79 System.err.println("Content-Type: " + 80 entity.getContentType().getValue()); 81 System.err.println("Content-Length: " + entity.getContentLength()); 82 } 83 InputStream in = entity.getContent(); 107 logger.debug("Content-Type: {}", entity.getContentType().getValue()); 108 logger.debug("Content-Length: {}", entity.getContentLength()); 109 } 110 InputStream in = entity.getContent(); 84 111 org.apache.http.Header enc = entity.getContentEncoding(); 85 112 if (enc != null) { 86 System.err.println("Content-Encoding: " +87 entity.getContentEncoding().getValue());88 if ( "gzip".equalsIgnoreCase(enc.getValue())) {113 final String encoding = enc.getValue(); 114 logger.debug("Content-Encoding: {}", encoding); 115 if (ENCODING_GZIP.equalsIgnoreCase(encoding)) { 89 116 in = new GZIPInputStream(in); 90 } else if ( "deflate".equalsIgnoreCase(enc.getValue())) {117 } else if (ENCODING_DEFLATE.equalsIgnoreCase(encoding)) { 91 118 in = new InflaterInputStream(in, new Inflater(true)); 119 } else { 120 throw new HarvesterException("Unsupported content encoding " + 121 "in HTTP response: " + encoding); 92 122 } 93 123 } … … 97 127 readContent(OAI_NS, "responseDate", true); 98 128 readStart(OAI_NS, "request", true, true); 99 // System.err.println("YYY XXX = " + toReadable(reader) + ", " +100 // reader.getAttributeValue(null, "verb"));101 129 readEnd(OAI_NS, "request", true); 102 130 … … 116 144 } 117 145 } 118 146 147 119 148 public void close() throws XMLStreamException { 120 149 readEnd(OAI_NS, "OAI-PMH"); 121 150 } 151 122 152 123 153 public void release(HarvestJobImpl job) { … … 134 164 /* IGNORE */ 135 165 } 166 167 /* 168 * try hard to release HTTP client resources ... 169 */ 136 170 try { 137 EntityUtils.consume( entity);138 } catch (IOException e ) {171 EntityUtils.consume(response.getEntity()); 172 } catch (IOException ex) { 139 173 /* IGNORE */ 140 174 } 141 } 142 143 public boolean readStart(String namespaceURI, String localName, 144 boolean required) throws XMLStreamException { 175 176 /* make sure to release allocated resources */ 177 HttpClientUtils.closeQuietly(response); 178 } 179 180 181 XMLStreamReader getXMLStreamReader() { 182 return reader; 183 } 184 185 186 Location getLocation() { 187 return reader.getLocation(); 188 } 189 190 191 boolean readStart(String namespaceURI, String localName, boolean required) 192 throws XMLStreamException { 145 193 return readStart(namespaceURI, localName, required, false); 146 194 } 147 195 148 public boolean readStart(String namespaceURI, String localName, 149 boolean required, boolean attributes) throws XMLStreamException { 150 // System.err.println("readStart (" + localName + ", required = " + 151 // required + ") @ " + toReadable(reader)); 196 197 boolean readStart(String namespaceURI, String localName, boolean required, 198 boolean attributes) throws XMLStreamException { 199 // System.err.println("readStart (" + localName + ", required = " + 200 // required + ") @ " + toReadable(reader)); 152 201 if (!reader.isEndElement()) { 153 202 while (reader.hasNext()) { 154 203 // System.err.println(" LOOP: " + dumpState()); 155 if (reader.isWhiteSpace()) { 156 reader.next(); 157 continue; 158 } 204 consumeWhitespace(); 159 205 if (reader.isStartElement()) { 160 206 if (namespaceURI.equals(reader.getNamespaceURI()) && 161 207 localName.equals(reader.getLocalName())) { 162 //System.err.print("--> found ");208 // System.err.print("--> found "); 163 209 if (!attributes) { 164 //System.err.print("and consumed ");210 // System.err.print("and consumed "); 165 211 reader.next(); // skip to next event 166 212 } 167 //System.err.println("@ " + toReadable(reader));213 // System.err.println("@ " + toReadable(reader)); 168 214 return true; 169 215 } … … 177 223 } 178 224 if (required) { 179 // System.err.println("--> error, not found @ " + toReadable(reader)); 180 throw new XMLStreamException("expected element '" + 181 new QName(namespaceURI, localName) + "', but found '" + 182 reader.getName() + "'", reader.getLocation()); 183 } 184 // System.err.println("--> not found @ " + toReadable(reader)); 225 logger.error("fail: {}", dumpState()); 226 227 // System.err.println("--> error, not found @ " + 228 // toReadable(reader)); 229 if (reader.hasName()) { 230 throw new XMLStreamException("expected element '" + 231 new QName(namespaceURI, localName) + "', but found '" + 232 reader.getName() + "'", reader.getLocation()); 233 } else { 234 throw new XMLStreamException("unexpacted state while scanning for start tag for element '" + new QName(namespaceURI, localName) + "'", reader.getLocation()); 235 } 236 } 237 // System.err.println("--> not found @ " + toReadable(reader)); 185 238 return false; 186 239 } 187 240 188 public void readEnd(String namespaceURI, String localName) 241 242 void readEnd(String namespaceURI, String localName) 189 243 throws XMLStreamException { 190 244 readEnd(namespaceURI, localName, false); 191 245 } 192 246 193 public void readEnd(String namespaceURI, String localName, 194 boolean skipContent) throws XMLStreamException { 195 // System.err.println("readEnd (" + localName + ") @ " + dumpState() + 196 // ", skipContent = " + skipContent); 247 248 void readEnd(String namespaceURI, String localName, boolean skipContent) 249 throws XMLStreamException { 250 // System.err.println("readEnd (" + localName + ") @ " + dumpState() + 251 // ", skipContent = " + skipContent); 197 252 int level = 1; 198 253 while (reader.hasNext()) { 199 // System.err.println(" LOOP " + dumpState() + " [" + 200 // level + "]"); 201 if (reader.isWhiteSpace()) { 202 reader.next(); 203 continue; 204 } 254 // System.err.println(" LOOP " + dumpState() + " [" + 255 // level + "]"); 256 consumeWhitespace(); 205 257 if (skipContent) { 206 258 if (reader.isCharacters()) { … … 219 271 if (reader.isEndElement()) { 220 272 level--; 221 //System.err.println(" @END-TAG: " + dumpState() + " [" +222 //level + "]");273 // System.err.println(" @END-TAG: " + dumpState() + " [" + 274 // level + "]"); 223 275 if (level == 0) { 224 276 if (namespaceURI.equals(reader.getNamespaceURI()) && 225 localName.equals(reader.getLocalName())) {277 localName.equals(reader.getLocalName())) { 226 278 reader.next(); // consume tag 227 279 break; … … 236 288 reader.next(); 237 289 } 238 // System.err.println("--> ok @ " + dumpState()); 239 } 240 241 public String readContent(String namespaceURI, String localName, 242 boolean required) throws XMLStreamException { 290 // System.err.println("--> ok @ " + dumpState()); 291 } 292 293 294 boolean peekStart(String namespaceURI, String localName) 295 throws XMLStreamException { 296 if (!reader.isEndElement()) { 297 while (reader.hasNext()) { 298 // System.err.println(" LOOP: " + dumpState()); 299 if (reader.isWhiteSpace()) { 300 reader.next(); 301 continue; 302 } 303 if (reader.isStartElement()) { 304 if (namespaceURI.equals(reader.getNamespaceURI()) && 305 localName.equals(reader.getLocalName())) { 306 return true; 307 } else { 308 return false; 309 } 310 } 311 if (reader.isCharacters() || reader.isEndElement()) { 312 break; 313 } 314 reader.next(); 315 } // while 316 } 317 return false; 318 } 319 320 321 String readContent(String namespaceURI, String localName, boolean required) 322 throws XMLStreamException { 323 return readContent(namespaceURI, localName, required, true); 324 } 325 326 327 String readContent(String namespaceURI, String localName, boolean required, 328 boolean contentRequired) throws XMLStreamException { 243 329 String result = null; 244 330 if (readStart(namespaceURI, localName, required)) { 245 result = readString(true); 331 try { 332 result = readString(contentRequired); 333 if (!contentRequired && (result == null)) { 334 result = ""; 335 } 336 } catch (XMLStreamException e) { 337 StringBuilder sb = new StringBuilder(); 338 sb.append("element '"); 339 sb.append(new QName(namespaceURI, localName)); 340 sb.append(localName).append("' may not be empty"); 341 throw new XMLStreamException(sb.toString(), e.getLocation()); 342 } 246 343 readEnd(namespaceURI, localName); 247 344 } … … 249 346 } 250 347 251 public String readString(boolean required) 252 throws XMLStreamException { 253 // System.err.println("readString @ " + toReadable(reader)); 348 349 int readContent(String namespaceURI, String localName, boolean required, 350 int defaultValue) throws XMLStreamException { 351 if (readStart(namespaceURI, localName, required)) { 352 String s = readString(true); 353 try { 354 readEnd(namespaceURI, localName); 355 } catch (XMLStreamException e) { 356 StringBuilder sb = new StringBuilder(); 357 sb.append("element '"); 358 sb.append(new QName(namespaceURI, localName)); 359 sb.append(localName).append("' may not be empty"); 360 throw new XMLStreamException(sb.toString(), e.getLocation()); 361 } 362 try { 363 return Integer.parseInt(s); 364 } catch (NumberFormatException e) { 365 StringBuilder sb = new StringBuilder(); 366 sb.append("element '"); 367 sb.append(new QName(namespaceURI, localName)); 368 sb.append("' was expected to be of type xs:integer; "); 369 sb.append("incompatible value was: "); 370 sb.append(s); 371 throw new XMLStreamException(sb.toString(), 372 reader.getLocation(), e); 373 } 374 } 375 return defaultValue; 376 } 377 378 379 String readString(boolean required) throws XMLStreamException { 380 // System.err.println("readString @ " + toReadable(reader)); 381 StringBuilder sb = new StringBuilder(); 382 while (reader.isCharacters()) { 383 String s = reader.getText(); 384 if (s != null) { 385 sb.append(s); 386 } 387 reader.next(); 388 } // while 254 389 String s = null; 255 if (reader.isCharacters()) { 256 s = reader.getText(); 257 if (s != null) { 258 s = s.trim(); 259 } 260 reader.next(); 390 if (sb.length() > 0) { 391 s = sb.toString().trim(); 261 392 } 262 393 if (required && ((s == null) || s.isEmpty())) { 263 394 throw new XMLStreamException("expected character content " 264 + "at position ", reader.getLocation());265 } 266 //System.err.println("--> ok @ " + toReadable(reader));395 + "at position ", reader.getLocation()); 396 } 397 // System.err.println("--> ok @ " + toReadable(reader)); 267 398 return s; 268 399 } 269 400 270 public String readAttributeValue(String namespaceURI, String localName) 401 402 String readAttributeValue(String namespaceURI, String localName) 271 403 throws XMLStreamException { 272 404 if (!reader.isStartElement()) { … … 281 413 } 282 414 283 public String readNamespaceURI() throws XMLStreamException { 415 416 String readNamespaceURI() throws XMLStreamException { 284 417 if (!reader.isStartElement()) { 285 418 throw new XMLStreamException("not at a start elment event", … … 289 422 } 290 423 291 public String peekElementLocalName() throws XMLStreamException { 424 425 String peekElementLocalName() throws XMLStreamException { 292 426 if (!reader.isStartElement()) { 293 427 throw new XMLStreamException("not at a start elment event", … … 297 431 } 298 432 299 public void consumeStart() throws XMLStreamException { 433 434 void consumeStart() throws XMLStreamException { 300 435 if (!reader.isStartElement()) { 301 436 throw new XMLStreamException("not at a start elment event", … … 305 440 } 306 441 307 public void consumeWhitespace() throws XMLStreamException { 308 while (reader.isWhiteSpace() && reader.hasNext()) { 442 443 void consumeWhitespace() throws XMLStreamException { 444 outer: 445 while (reader.hasNext()) { 446 /* 447 * some hackish method to also skip stray UTF-8 BOMs; only works 448 * where whitespace is expected 449 */ 450 if (reader.isCharacters()) { 451 final char buffer[] = reader.getTextCharacters(); 452 final int end = reader.getTextStart() + reader.getTextLength(); 453 for (int i = reader.getTextStart(); i < end; i++) { 454 if (!(Character.isWhitespace(buffer[i]) || 455 (buffer[i] == 0x0FEFF))) { 456 break outer; 457 } 458 } 459 } else if (!reader.isWhiteSpace()) { 460 break outer; 461 } 309 462 reader.next(); 310 continue; 311 } 312 } 313 314 public XMLStreamReader getXMLStreamReader() { 315 return reader; 316 } 317 318 public String dumpState() { 463 } // while 464 } 465 466 467 void copyTo(XMLStreamWriter writer) throws XMLStreamException { 468 final int depth = reader.getDepth(); 469 do { 470 copyEvent(reader, writer); 471 reader.next(); 472 } while (reader.getDepth() >= depth); 473 } 474 475 476 private static void copyEvent(XMLStreamReader from, XMLStreamWriter to) 477 throws XMLStreamException { 478 switch (from.getEventType()) { 479 case XMLStreamConstants.START_DOCUMENT: 480 { 481 String version = from.getVersion(); 482 if (version == null || version.length() == 0) { 483 to.writeStartDocument(); 484 } else { 485 to.writeStartDocument(from.getCharacterEncodingScheme(), 486 from.getVersion()); 487 } 488 to.writeCharacters("\n"); 489 } 490 return; 491 492 case XMLStreamConstants.END_DOCUMENT: 493 to.writeCharacters("\n"); 494 to.writeEndDocument(); 495 return; 496 497 case XMLStreamConstants.START_ELEMENT: 498 copyStartElement(from, to); 499 return; 500 501 case XMLStreamConstants.END_ELEMENT: 502 to.writeEndElement(); 503 return; 504 505 case XMLStreamConstants.SPACE: 506 to.writeCharacters(from.getTextCharacters(), from.getTextStart(), 507 from.getTextLength()); 508 return; 509 510 case XMLStreamConstants.CDATA: 511 to.writeCData(from.getText()); 512 return; 513 514 case XMLStreamConstants.CHARACTERS: 515 to.writeCharacters(from.getTextCharacters(), from.getTextStart(), 516 from.getTextLength()); 517 return; 518 519 case XMLStreamConstants.COMMENT: 520 to.writeComment(from.getText()); 521 return; 522 523 case XMLStreamConstants.PROCESSING_INSTRUCTION: 524 to.writeProcessingInstruction(from.getPITarget(), from.getPIData()); 525 return; 526 527 case XMLStreamConstants.DTD: 528 case XMLStreamConstants.ENTITY_REFERENCE: 529 case XMLStreamConstants.ATTRIBUTE: 530 case XMLStreamConstants.NAMESPACE: 531 case XMLStreamConstants.ENTITY_DECLARATION: 532 case XMLStreamConstants.NOTATION_DECLARATION: 533 /* FALL_TROUGH */ 534 } 535 throw new XMLStreamException("unsupported event type: " + 536 from.getEventType()); 537 } 538 539 540 private static void copyStartElement(XMLStreamReader from, 541 XMLStreamWriter to) throws XMLStreamException { 542 final int nsCount = from.getNamespaceCount(); 543 if (nsCount > 0) { // yup, got some... 544 for (int i = 0; i < nsCount; ++i) { 545 String pfx = from.getNamespacePrefix(i); 546 String uri = from.getNamespaceURI(i); 547 if ((pfx == null) || pfx.isEmpty()) { // default NS 548 to.setDefaultNamespace(uri); 549 } else { 550 to.setPrefix(pfx, uri); 551 } 552 } 553 } 554 555 final String prefix = from.getPrefix(); 556 final NamespaceContext from_ctx = from.getNamespaceContext(); 557 final NamespaceContext to_ctx = to.getNamespaceContext(); 558 boolean repair_prefix_namespace = false; 559 if ((prefix != null) && (to_ctx.getNamespaceURI(prefix) == null)) { 560 repair_prefix_namespace = true; 561 to.setPrefix(prefix, from_ctx.getNamespaceURI(prefix)); 562 } 563 564 to.writeStartElement(prefix, from.getLocalName(), 565 from.getNamespaceURI()); 566 567 if (nsCount > 0) { 568 // write namespace declarations 569 for (int i = 0; i < nsCount; ++i) { 570 String pfx = from.getNamespacePrefix(i); 571 String uri = from.getNamespaceURI(i); 572 573 if ((pfx == null) || pfx.isEmpty()) { // default NS 574 to.writeDefaultNamespace(uri); 575 } else { 576 to.writeNamespace(pfx, uri); 577 } 578 } 579 } 580 if (repair_prefix_namespace) { 581 to.writeNamespace(prefix, from_ctx.getNamespaceURI(prefix)); 582 } 583 584 int attrCount = from.getAttributeCount(); 585 if (attrCount > 0) { 586 for (int i = 0; i < attrCount; ++i) { 587 to.writeAttribute(from.getAttributePrefix(i), 588 from.getAttributeNamespace(i), 589 from.getAttributeLocalName(i), 590 from.getAttributeValue(i)); 591 } 592 } 593 } 594 595 596 String dumpState() { 319 597 StringBuilder sb = new StringBuilder(); 320 598 switch (reader.getEventType()) { 321 599 case XMLStreamConstants.START_DOCUMENT: 322 return "START_DOC ";600 return "START_DOC["; 323 601 case XMLStreamConstants.END_DOCUMENT: 324 return "END_DOC ";602 return "END_DOC["; 325 603 case XMLStreamConstants.START_ELEMENT: 326 604 sb.append("START["); … … 328 606 sb.append(","); 329 607 sb.append(reader.getLocalName()); 330 sb.append("]");331 608 break; 332 609 case XMLStreamConstants.END_ELEMENT: … … 335 612 sb.append(","); 336 613 sb.append(reader.getLocalName()); 337 sb.append("]");338 614 break; 339 615 case XMLStreamConstants.CHARACTERS: 340 616 sb.append("CHARACTERS[\""); 341 sb.append(reader.getText().replace("\n", "\\n") 342 .replace("\r", "\\r").replace("\t", "\\t")); 617 sb.append(reader.getText() 618 .replace("\n", "\\n") 619 .replace("\r", "\\r") 620 .replace("\t", "\\t")); 343 621 sb.append("\", isWhitespace = "); 344 622 sb.append(reader.isWhiteSpace()); 345 sb.append("]");346 623 break; 347 624 case XMLStreamConstants.CDATA: … … 351 628 sb.append("\", isWhitespace = "); 352 629 sb.append(reader.isWhiteSpace()); 353 sb.append("]");354 630 break; 355 631 default: 356 632 sb.append(Integer.toString(reader.getEventType())); 357 633 } 634 Location location = reader.getLocation(); 635 sb.append(", row = ").append(location.getLineNumber()); 636 sb.append(", col = ").append(location.getColumnNumber()); 637 sb.append(", offset = ").append(location.getCharacterOffset()); 638 sb.append("]"); 358 639 return sb.toString(); 359 640 } 360 641 361 642 } // class XmlStreamReaderWrapper -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/SimpleHarvester.java
r1148 r3011 3 3 import java.net.URI; 4 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.atomic.AtomicLong; 6 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 5 9 6 10 import eu.clarin.cmdi.oai.harvester.HarvestHandler; … … 9 13 import eu.clarin.cmdi.oai.harvester.HarvesterException; 10 14 15 11 16 public final class SimpleHarvester extends AbstractHarvester { 12 private HarvestWorker worker; 13 private long nextId = 0; 17 private static final Logger logger = 18 LoggerFactory.getLogger(SimpleHarvester.class); 19 private final AtomicLong nextId = new AtomicLong(0); 20 private final HarvestWorker worker; 21 14 22 15 23 private SimpleHarvester() { 16 24 this.worker = new HarvestWorker(this); 17 25 } 26 18 27 19 28 @Override … … 22 31 throw new IllegalArgumentException("repositoryURI == null"); 23 32 } 24 long jobId; 25 synchronized (this) { 26 jobId = ++nextId; 27 } // synchronized 28 return new HarvestJobImpl(this, jobId, repositoryURI, handler); 33 return new HarvestJobImpl(this, nextId.incrementAndGet(), 34 repositoryURI, handler); 29 35 } 36 30 37 31 38 public static Harvester newInstance() { … … 33 40 } 34 41 42 35 43 @Override 36 44 public void shutdown() { 37 45 } 46 38 47 39 48 @Override … … 52 61 ((delay = job.getDelay(TimeUnit.MILLISECONDS)) > 0)) { 53 62 try { 54 System.err.println(job.getId() + ": (" + 55 job.getNetworkRetryCount() + ") waiting " + 56 delay + " ..."); 57 /* add a few milliseconds, so we do not loop */ 63 logger.debug("{}: ({}) waiting {} ...", 64 job.getId(), job.getNetworkRetryCount(), delay); 65 /* add a few milliseconds, so we do not loop */ 58 66 Thread.sleep(delay + 25); 59 67 } catch (InterruptedException e) { … … 64 72 } 65 73 74 66 75 @Override 67 76 void doCancelJob(HarvestJobImpl job) throws HarvesterException { … … 69 78 } 70 79 80 71 81 @Override 72 82 protected HarvestJob doFindJob(long id) throws HarvesterException { 73 throw new HarvesterException("findJob() is not supported"); 83 throw new HarvesterException("findJob() is not supported"); 74 84 } 75 85 -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/XmlStreamReaderProxy.java
r1128 r3011 15 15 private State state; 16 16 private int depth; 17 17 18 18 XmlStreamReaderProxy() { 19 19 super(); … … 35 35 XMLStreamConstants.ATTRIBUTE)) { 36 36 throw new IllegalStateException( 37 "Current event is not START_ELEMENT or ATTRIBUTE ");37 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 38 38 } 39 39 return reader.getAttributeCount(); … … 45 45 XMLStreamConstants.ATTRIBUTE)) { 46 46 throw new IllegalStateException( 47 "Current event is not START_ELEMENT or ATTRIBUTE ");47 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 48 48 } 49 49 return reader.getAttributeLocalName(index); … … 55 55 XMLStreamConstants.ATTRIBUTE)) { 56 56 throw new IllegalStateException( 57 "Current event is not START_ELEMENT or ATTRIBUTE ");57 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 58 58 } 59 59 return reader.getAttributeName(index); … … 65 65 XMLStreamConstants.ATTRIBUTE)) { 66 66 throw new IllegalStateException( 67 "Current event is not START_ELEMENT or ATTRIBUTE ");67 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 68 68 } 69 69 return reader.getAttributeNamespace(index); … … 75 75 XMLStreamConstants.ATTRIBUTE)) { 76 76 throw new IllegalStateException( 77 "Current event is not START_ELEMENT or ATTRIBUTE ");77 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 78 78 } 79 79 return reader.getAttributePrefix(index); … … 85 85 XMLStreamConstants.ATTRIBUTE)) { 86 86 throw new IllegalStateException( 87 "Current event is not START_ELEMENT or ATTRIBUTE ");87 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 88 88 } 89 89 return reader.getAttributeType(index); … … 95 95 XMLStreamConstants.ATTRIBUTE)) { 96 96 throw new IllegalStateException( 97 "Current event is not START_ELEMENT or ATTRIBUTE ");97 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 98 98 } 99 99 return reader.getAttributeValue(index); … … 105 105 XMLStreamConstants.ATTRIBUTE)) { 106 106 throw new IllegalStateException( 107 "Current event is not START_ELEMENT or ATTRIBUTE ");107 "Current event is not START_ELEMENT or ATTRIBUTE: " + reader.getLocation()); 108 108 } 109 109 return reader.getAttributeValue(namespaceURI, localName); … … 119 119 if (!checkEventType(XMLStreamConstants.START_ELEMENT)) { 120 120 throw new IllegalStateException( 121 "Current event is not START_ELEMENT ");121 "Current event is not START_ELEMENT: " + reader.getLocation()); 122 122 } 123 123 return reader.getElementText(); … … 166 166 XMLStreamConstants.END_ELEMENT)) { 167 167 throw new IllegalStateException("Current event is not " + 168 "START_ELEMENT or END_ELEMENT ");168 "START_ELEMENT or END_ELEMENT: " + reader.getLocation()); 169 169 } 170 170 return reader.getName(); … … 176 176 XMLStreamConstants.END_ELEMENT, XMLStreamConstants.NAMESPACE)) { 177 177 throw new IllegalStateException("Current event is not " + 178 "START_ELEMENT, END_ELEMENT or NAMESPACE ");178 "START_ELEMENT, END_ELEMENT or NAMESPACE: " + reader.getLocation()); 179 179 } 180 180 return reader.getNamespaceContext(); … … 186 186 XMLStreamConstants.END_ELEMENT, XMLStreamConstants.NAMESPACE)) { 187 187 throw new IllegalStateException("Current event is not " + 188 "START_ELEMENT, END_ELEMENT or NAMESPACE ");188 "START_ELEMENT, END_ELEMENT or NAMESPACE: " + reader.getLocation()); 189 189 } 190 190 return reader.getNamespaceCount(); … … 196 196 XMLStreamConstants.END_ELEMENT, XMLStreamConstants.NAMESPACE)) { 197 197 throw new IllegalStateException("Current event is not " + 198 "START_ELEMENT, END_ELEMENT or NAMESPACE ");198 "START_ELEMENT, END_ELEMENT or NAMESPACE: " + reader.getLocation()); 199 199 } 200 200 return reader.getNamespacePrefix(index); … … 206 206 XMLStreamConstants.END_ELEMENT, XMLStreamConstants.NAMESPACE)) { 207 207 throw new IllegalStateException("Current event is not " + 208 "START_ELEMENT, END_ELEMENT or NAMESPACE ");208 "START_ELEMENT, END_ELEMENT or NAMESPACE: " + reader.getLocation()); 209 209 } 210 210 return reader.getNamespaceURI(); … … 221 221 XMLStreamConstants.END_ELEMENT, XMLStreamConstants.NAMESPACE)) { 222 222 throw new IllegalStateException("Current event is not " + 223 "START_ELEMENT, END_ELEMENT or NAMESPACE ");223 "START_ELEMENT, END_ELEMENT or NAMESPACE: " + reader.getLocation()); 224 224 } 225 225 return reader.getNamespaceURI(index); … … 230 230 if (!checkEventType(XMLStreamConstants.PROCESSING_INSTRUCTION)) { 231 231 throw new IllegalStateException( 232 "Current event is not PROCESSING_INSTRUCTION ");232 "Current event is not PROCESSING_INSTRUCTION: " + reader.getLocation()); 233 233 } 234 234 return reader.getPIData(); … … 239 239 if (!checkEventType(XMLStreamConstants.PROCESSING_INSTRUCTION)) { 240 240 throw new IllegalStateException( 241 "Current event is not PROCESSING_INSTRUCTION ");241 "Current event is not PROCESSING_INSTRUCTION: " + reader.getLocation()); 242 242 } 243 243 return reader.getPITarget(); … … 445 445 break; 446 446 default: 447 ;447 break; 448 448 } // switch (t) 449 449 if (depth < 0) { -
OAIHarvester/trunk/OAIHarvester/src/test/java/eu/clarin/cmdi/oai/harvester/HarvesterTest.java
r1906 r3011 15 15 import javax.xml.stream.XMLOutputFactory; 16 16 import javax.xml.stream.XMLStreamConstants; 17 import javax.xml.stream.XMLStreamException; 17 18 import javax.xml.stream.XMLStreamReader; 18 19 import javax.xml.stream.XMLStreamWriter; … … 105 106 } 106 107 } 107 108 108 109 @Override 109 110 public void onListMetadataFormats(List<MetadataFormat> metadataFormats) { … … 182 183 output.flush(); 183 184 } 184 } catch (Exception e) { 185 } catch (IOException e) { 186 e.printStackTrace(); 187 throw new RuntimeException("record failed"); 188 } catch (XMLStreamException e) { 185 189 e.printStackTrace(); 186 190 throw new RuntimeException("record failed"); … … 275 279 // cal.set(2010, Calendar.DECEMBER, 15); 276 280 // job.setFrom(cal.getTime()); 277 // job.setMetadataPrefixes(Arrays.asList(" oai_dc", "cmdi"));281 // job.setMetadataPrefixes(Arrays.asList("cmdi")); 278 282 job.run(); 279 283 284 /* 285 * Bad example of (busy) wait loop; should be done better, e.g. 286 * with CountDownLatch ... 287 */ 280 288 while (job.isRunning()) { 281 289 try { … … 372 380 } 373 381 382 383 static { 384 /* 385 * quick and dirty setup for Log4J; don't copy blindly! 386 */ 387 org.apache.log4j.BasicConfigurator 388 .configure(new org.apache.log4j.ConsoleAppender( 389 new org.apache.log4j.PatternLayout("%-5p [%t] %m%n"), 390 org.apache.log4j.ConsoleAppender.SYSTEM_ERR)); 391 org.apache.log4j.Logger logger = 392 org.apache.log4j.Logger.getRootLogger(); 393 logger.setLevel(org.apache.log4j.Level.INFO); 394 logger.getLoggerRepository().getLogger("eu.clarin") 395 .setLevel(org.apache.log4j.Level.DEBUG); 396 } 397 374 398 } // class HarvesterTest
Note: See TracChangeset
for help on using the changeset viewer.