Changeset 1137 for OAIHarvester
- Timestamp:
- 02/22/11 09:50:37 (13 years ago)
- Location:
- OAIHarvester/trunk/OAIHarvester/src
- Files:
-
- 1 added
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/HarvestHandler.java
r1128 r1137 84 84 public void onRecordAbout(Header header, XMLStreamReader reader); 85 85 86 /* XXX: move to different interface? */ 86 /** 87 * Optionally warp input stream. 88 * 89 * @param stream 90 * @return 91 * @throws IOException 92 */ 87 93 public InputStream wrap(InputStream stream) throws IOException; 88 94 89 95 } // interface HarvestHandler -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/HarvestJob.java
r1128 r1137 13 13 }; 14 14 15 public String getId();15 public long getId(); 16 16 17 17 public State getState(); 18 19 public boolean isRunning(); 18 20 19 21 public long getRecordCount(); … … 34 36 35 37 public HarvestHandler getHarvestHandler(); 36 37 public void setHarvestHandler(HarvestHandler handler);38 38 39 39 public Date getFrom(); -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/Harvester.java
r1128 r1137 23 23 throws HarvesterException; 24 24 25 public void shutdown(); 26 25 27 } // interface Harvester -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/AbstractHarvester.java
r1128 r1137 20 20 private final XMLInputFactory factory; 21 21 // private final XMLValidationSchema schema; 22 23 22 24 23 protected AbstractHarvester() { 25 24 factory = XMLInputFactory2.newInstance(); … … 88 87 abstract void doCancelJob(HarvestJobImpl job) throws HarvesterException; 89 88 90 XMLStreamReader createReader(InputStream in) throws XMLStreamException { 89 protected abstract HarvestJob doFindJob(String id) 90 throws HarvesterException; 91 92 protected final XMLStreamReader createReader(InputStream in) 93 throws XMLStreamException { 91 94 XMLStreamReader2 reader = 92 95 (XMLStreamReader2) factory.createXMLStreamReader(in); … … 95 98 } 96 99 97 protected abstract HarvestJob doFindJob(String id)98 throws HarvesterException;99 100 100 } // abstract class AbstractHarvester -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestJobImpl.java
r1128 r1137 8 8 import java.util.Date; 9 9 import java.util.List; 10 import java.util.concurrent. atomic.AtomicReference;11 12 import javax.xml.stream.XMLStreamException; 10 import java.util.concurrent.Delayed; 11 import java.util.concurrent.TimeUnit; 12 13 13 import javax.xml.stream.XMLStreamReader; 14 14 … … 22 22 import eu.clarin.cmdi.oai.harvester.Repository.Granularity; 23 23 24 final class HarvestJobImpl implements HarvestJob {25 p rivateenum Task {24 final class HarvestJobImpl implements HarvestJob, Delayed { 25 public enum Task { 26 26 IDENTIFY_REPOSITORY, 27 27 ENUMERATE_METADATA_FORMATS, 28 INITALIZE_HARVEST_RECORDS,29 28 HARVEST_RECORDS 30 29 } 31 30 private final AbstractHarvester harvester; 32 private final String id;31 private final long id; 33 32 private final URI repositoryURI; 33 private final HarvestHandler handler; 34 34 private State state = State.NEW; 35 private AtomicReference<HarvestHandler> handler;36 35 private Date from; 37 36 private Date until; … … 53 52 private long timeTotal = 0; 54 53 private long timeNetwork = 0; 55 private long time Wait= 0;54 private long timeDelayed = 0; 56 55 private long timeProcessing = 0; 57 private long startTime ;56 private long startTimestamp; 58 57 private Task task = Task.IDENTIFY_REPOSITORY; 59 private List<String> availablePrefixes; 60 private String resumptionToken; 61 private Date latestDatestamp; 62 63 HarvestJobImpl(AbstractHarvester harvester, String id, URI repositoryURI, 58 private long delayUntil = 0; 59 private int networkTry = 0; 60 private List<String> prefixWorklist = null; 61 private String resumptionToken = null; 62 private Date latestDatestamp = null; 63 64 HarvestJobImpl(AbstractHarvester harvester, long id, URI repositoryURI, 64 65 HarvestHandler handler) { 65 66 this.harvester = harvester; 66 67 this.id = id; 67 68 this.repositoryURI = repositoryURI; 68 this.handler = new AtomicReference<HarvestHandler>(handler);69 } 70 71 @Override 72 public String getId() {69 this.handler = handler; 70 } 71 72 @Override 73 public long getId() { 73 74 return id; 74 75 } … … 80 81 81 82 @Override 83 public boolean isRunning() { 84 return state == HarvestJob.State.RUNNING; 85 } 86 87 @Override 82 88 public long getRecordCount() { 83 89 return recordCount; … … 111 117 @Override 112 118 public long getWaitTime() { 113 return time Wait;119 return timeDelayed; 114 120 } 115 121 … … 122 128 @Override 123 129 public HarvestHandler getHarvestHandler() { 124 return handler.get(); 125 } 126 127 @Override 128 public void setHarvestHandler(HarvestHandler handler) { 129 this.handler.set(handler); 130 return handler; 130 131 } 131 132 … … 197 198 @Override 198 199 public void cancel() throws HarvesterException { 199 s tate = State.CANCELED;200 setState(State.CANCELED); 200 201 harvester.doCancelJob(this); 202 } 203 204 @Override 205 public long getDelay(TimeUnit unit) { 206 return unit.convert((delayUntil - System.currentTimeMillis()), 207 TimeUnit.MILLISECONDS); 208 } 209 210 @Override 211 public int compareTo(Delayed o) { 212 final long me = getDelay(TimeUnit.MICROSECONDS); 213 final long they = o.getDelay(TimeUnit.MILLISECONDS); 214 if (me > they) { 215 return 1; 216 } else if (me < they) { 217 return -1; 218 } else { 219 return 0; 220 } 221 } 222 223 @Override 224 public String toString() { 225 StringBuilder sb = new StringBuilder(); 226 sb.append(getClass().getCanonicalName()); 227 sb.append("[id="); 228 sb.append(id); 229 sb.append(", state="); 230 sb.append(state); 231 sb.append(", task="); 232 sb.append(task); 233 sb.append("]"); 234 return sb.toString(); 201 235 } 202 236 … … 206 240 207 241 void setState(State state) { 242 if ((this.state != State.RUNNING) && (state == State.RUNNING)) { 243 startTimestamp = System.currentTimeMillis(); 244 } 245 if ((this.state == State.RUNNING) && (state != State.RUNNING)) { 246 timeTotal = System.currentTimeMillis() - startTimestamp; 247 } 208 248 this.state = state; 209 249 } … … 257 297 } 258 298 259 void addToBytesTransferred(long bytes) {260 bytedTransferred += bytes;261 }262 263 299 void addToNetworkTime(long delta) { 264 300 timeNetwork += delta; 265 301 } 266 302 267 void addToWaitTime(long delta) { 268 timeWait += delta; 269 } 270 271 272 void addToProcessingTime(long delta) { 273 timeProcessing += delta; 303 void finishRequest(long bytesTransferred, long timeProcessingDelta) { 304 timeProcessing += timeProcessingDelta; 305 bytedTransferred += bytesTransferred; 306 } 307 308 boolean isState(State state) { 309 return this.state == state; 310 } 311 312 Task getTask() { 313 return task; 314 } 315 316 void setTask(Task task) { 317 this.task = task; 318 } 319 320 String getCurrentPrefix() { 321 if (prefixWorklist == null) { 322 prefixWorklist = new ArrayList<String>(); 323 if (metadataPrefixes != null) { 324 prefixWorklist.addAll(metadataPrefixes); 325 } else if (metadataFormats != null) { 326 for (MetadataFormat format : metadataFormats) { 327 prefixWorklist.add(format.getPrefix()); 328 } 329 } else { 330 prefixWorklist.add("oai_dc"); 331 } 332 } 333 return prefixWorklist.isEmpty() ? null : prefixWorklist.get(0); 334 } 335 336 boolean removeCurrentPrefix() { 337 prefixWorklist.remove(0); 338 return !prefixWorklist.isEmpty(); 274 339 } 275 340 … … 282 347 } 283 348 284 String getCurrentPrefix() { 285 return availablePrefixes.get(0); 286 } 287 288 boolean process(HarvestWorker worker) throws HarvesterException { 289 if (state == State.CANCELED) { 290 return true; 291 } 292 if (state != State.RUNNING) { 293 throw new HarvesterException("job is not in running state"); 294 } 295 try { 296 switch (task) { 297 case IDENTIFY_REPOSITORY: 298 startTime = System.currentTimeMillis(); 299 worker.doIdentify(this); 300 task = Task.ENUMERATE_METADATA_FORMATS; 301 break; 302 case ENUMERATE_METADATA_FORMATS: 303 worker.doListMetadataFormats(this); 304 task = Task.INITALIZE_HARVEST_RECORDS; 305 break; 306 case INITALIZE_HARVEST_RECORDS: 307 if (availablePrefixes == null) { 308 availablePrefixes = new ArrayList<String>(); 309 if (metadataPrefixes != null) { 310 availablePrefixes.addAll(metadataPrefixes); 311 } else if (metadataFormats != null) { 312 for (MetadataFormat format : metadataFormats) { 313 availablePrefixes.add(format.getPrefix()); 314 } 315 } else { 316 availablePrefixes.add("oai_dc"); 317 } 318 } 319 task = Task.HARVEST_RECORDS; 320 resumptionToken = null; 321 latestDatestamp = null; 322 onStartListingRecords(getCurrentPrefix()); 323 /* FALL-TROUGH */ 324 case HARVEST_RECORDS: 325 boolean more = worker.doListRecords(this); 326 if (!more) { 327 onFinishListingRecords(getCurrentPrefix()); 328 availablePrefixes.remove(0); 329 if (availablePrefixes.isEmpty()) { 330 timeTotal = System.currentTimeMillis() - startTime; 331 state = State.DONE; 332 return true; 333 } else { 334 task = Task.INITALIZE_HARVEST_RECORDS; 335 } 336 } 337 } 338 } catch (Throwable e) { 339 timeTotal = System.currentTimeMillis() - startTime; 340 state = State.ERROR; 341 if (e instanceof HarvesterException) { 342 throw (HarvesterException) e; 343 } else if (e instanceof XMLStreamException) { 344 throw new HarvesterException("malformed OAI response", e); 345 } else if (e instanceof IOException) { 346 throw new HarvesterException("network error", e); 347 } else { 348 throw new HarvesterException("unexpected exception", e); 349 } 350 } 351 return false; 352 } 353 354 void onStartListingRecords(String prefix) { 355 final HarvestHandler h = handler.get(); 356 if (h != null) { 357 h.onStartListingRecords(prefix); 358 } 359 } 360 361 void onFinishListingRecords(String prefix) { 362 final HarvestHandler h = handler.get(); 363 if (h != null) { 364 h.onFinishListingRecords(prefix, latestDatestamp); 365 } 349 boolean isHarvestingNewPrefix() { 350 return (resumptionToken == null) && (networkTry == 0); 351 } 352 353 int incNetworkRequestCount() { 354 return ++networkTry; 355 } 356 357 int getNetworkRequestCount() { 358 return networkTry; 359 } 360 361 void resetNetworkRequestCount() { 362 delayUntil = 0; 363 networkTry = 0; 364 } 365 366 void setNetworkRequestDelay(long delay) { 367 this.timeDelayed += delay; 368 this.delayUntil = System.currentTimeMillis() + delay; 369 } 370 371 void onStartListingRecords() { 372 if (handler != null) { 373 handler.onStartListingRecords(getCurrentPrefix()); 374 } 375 } 376 377 void onFinishListingRecords() { 378 if (handler != null) { 379 handler.onFinishListingRecords(getCurrentPrefix(), latestDatestamp); 380 } 381 // reset ... 382 resumptionToken = null; 383 latestDatestamp = null; 366 384 } 367 385 … … 375 393 } 376 394 } 377 final HarvestHandler h = handler.get(); 378 if (h != null) { 379 h.onRecord(header); 395 if (handler != null) { 396 handler.onRecord(header); 380 397 } 381 398 } 382 399 383 400 void onRecordMetadata(Header header, XMLStreamReader reader) { 384 final HarvestHandler h = handler.get(); 385 if (h != null) { 386 h.onRecordMetadata(header, reader); 401 if (handler != null) { 402 handler.onRecordMetadata(header, reader); 387 403 } 388 404 } 389 405 390 406 void onRecordAbout(Header header, XMLStreamReader reader) { 391 final HarvestHandler h = handler.get(); 392 if (h != null) { 393 h.onRecordAbout(header, reader); 407 if (handler != null) { 408 handler.onRecordAbout(header, reader); 394 409 } 395 410 } 396 411 397 412 void onIdentify() { 398 final HarvestHandler h = handler.get(); 399 if (h != null) { 413 if (handler != null) { 400 414 final Repository repository = new Repository() { 401 415 @Override … … 439 453 } 440 454 }; 441 h .onIdentify(repository);455 handler.onIdentify(repository); 442 456 } 443 457 } … … 445 459 void onListMetadataFormats(List<MetadataFormat> metadataFormats) { 446 460 this.metadataFormats = metadataFormats; 447 final HarvestHandler h = handler.get(); 448 if (h != null) { 449 h.onListMetadataFormats(this.metadataFormats); 461 if (handler != null) { 462 handler.onListMetadataFormats(this.metadataFormats); 450 463 } 451 464 } 452 465 453 466 InputStream wrap(InputStream stream) throws IOException { 454 final HarvestHandler h = handler.get(); 455 if (h != null) { 456 return h.wrap(stream); 467 if (handler != null) { 468 return handler.wrap(stream); 457 469 } else { 458 470 return stream; … … 460 472 } 461 473 462 } // class HarvestJob 474 } // class HarvestJobImpl -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/HarvestWorker.java
r1128 r1137 23 23 import org.joda.time.format.DateTimeFormatter; 24 24 25 import eu.clarin.cmdi.oai.harvester.HarvestJob.State; 25 26 import eu.clarin.cmdi.oai.harvester.HarvesterException; 26 27 import eu.clarin.cmdi.oai.harvester.MetadataFormat; … … 28 29 import eu.clarin.cmdi.oai.harvester.Repository.DeletedNotion; 29 30 import eu.clarin.cmdi.oai.harvester.Repository.Granularity; 31 import eu.clarin.cmdi.oai.harvester.impl.HarvestJobImpl.Task; 30 32 31 33 public class HarvestWorker { … … 56 58 private final HeaderImpl header; 57 59 private final XmlStreamReaderProxy readerWrapper; 58 private int retries = 5;60 private long delay; 59 61 60 62 HarvestWorker(AbstractHarvester harvester) { … … 71 73 } 72 74 73 void doIdentify(HarvestJobImpl job) throws IOException, XMLStreamException, 74 HarvesterException { 75 long process(HarvestJobImpl job, int maxNetworkTries) 76 throws HarvesterException { 77 if (job.isState(State.CANCELED)) { 78 return -1; 79 } 80 if (!job.isState(State.RUNNING)) { 81 throw new HarvesterException("job is not in running state"); 82 } 83 84 boolean success; 85 try { 86 switch (job.getTask()) { 87 case IDENTIFY_REPOSITORY: 88 success = doIdentifyTask(job); 89 if (success) { 90 job.setTask(Task.ENUMERATE_METADATA_FORMATS); 91 } 92 break; 93 case ENUMERATE_METADATA_FORMATS: 94 success = doListMetadataFormatsTask(job); 95 if (success) { 96 job.setTask(Task.HARVEST_RECORDS); 97 } 98 break; 99 case HARVEST_RECORDS: 100 // check for new prefix and notify start, if needed 101 if (job.isHarvestingNewPrefix()) { 102 job.onStartListingRecords(); 103 } 104 success = doListRecordsTask(job); 105 if (success) { 106 // notify end of prefix 107 job.onFinishListingRecords(); 108 if (!job.removeCurrentPrefix()) { 109 job.setState(State.DONE); 110 } 111 } 112 break; 113 default: 114 throw new HarvesterException("internal error"); 115 } // switch 116 117 if (success) { 118 job.resetNetworkRequestCount(); 119 } else { 120 if (delay > 0) { 121 job.setNetworkRequestDelay(delay); 122 } 123 if (job.incNetworkRequestCount() >= maxNetworkTries) { 124 throw new HarvesterException("too many network retries: " + 125 job.getNetworkRequestCount()); 126 } 127 } 128 return delay; 129 } catch (Throwable e) { 130 job.setState(State.ERROR); 131 if (e instanceof HarvesterException) { 132 throw (HarvesterException) e; 133 } else if (e instanceof XMLStreamException) { 134 throw new HarvesterException("malformed OAI response", e); 135 } else if (e instanceof IOException) { 136 throw new HarvesterException("network error", e); 137 } else { 138 throw new HarvesterException("unexpected exception", e); 139 } 140 } 141 } 142 143 private boolean doIdentifyTask(HarvestJobImpl job) throws IOException, 144 XMLStreamException, HarvesterException { 75 145 Response response = null; 76 146 try { 77 147 uriBuilder.setVerb(VERB_IDENTIFY); 78 148 response = execute(job, false); 149 if (response == null) { 150 return false; 151 } 79 152 80 153 response.readStart(OAI_NS, "Identify", true); … … 154 227 155 228 job.onIdentify(); 229 230 return true; 156 231 } finally { 157 232 if (response != null) { 158 response.release( );159 } 160 } 161 } 162 163 void doListMetadataFormats(HarvestJobImpl job) throws IOException,164 XMLStreamException, HarvesterException {233 response.release(job); 234 } 235 } 236 } 237 238 private boolean doListMetadataFormatsTask(HarvestJobImpl job) 239 throws IOException, XMLStreamException, HarvesterException { 165 240 Response response = null; 166 241 try { 167 242 uriBuilder.setVerb(VERB_LIST_METADATAFORMATS); 168 243 response = execute(job, true); 244 if (response == null) { 245 return false; 246 } 169 247 170 248 response.readStart(OAI_NS, "ListMetadataFormats", true); … … 187 265 188 266 job.onListMetadataFormats(metadataFormats); 267 268 return true; 189 269 } finally { 190 270 if (response != null) { 191 response.release( );192 } 193 } 194 } 195 196 boolean doListRecords(HarvestJobImpl job) throws IOException,271 response.release(job); 272 } 273 } 274 } 275 276 private boolean doListRecordsTask(HarvestJobImpl job) throws IOException, 197 277 XMLStreamException, HarvesterException { 198 278 Response response = null; … … 220 300 221 301 response = execute(job, true); 302 if (response == null) { 303 return false; 304 } 305 222 306 String nextToken = null; 223 307 … … 232 316 header.setDeleted(true); 233 317 } else { 234 throw new XMLStreamException("attribute 'status of'" +318 throw new XMLStreamException("attribute 'status' of " + 235 319 "element 'record' must contain 'deleted'"); 236 320 } … … 283 367 response.close(); 284 368 285 return (nextToken != null);369 return (nextToken == null); 286 370 } finally { 287 371 if (response != null) { 288 response.release( );372 response.release(job); 289 373 } 290 374 } … … 294 378 throws IOException, XMLStreamException, HarvesterException { 295 379 long now = System.currentTimeMillis(); 296 do { 297 job.incRequestCount(); 298 final URI uri = uriBuilder.createURI(job.getRepositoryURI()); 299 System.err.println("HTTP GET " + uri); 300 final HttpGet request = new HttpGet(uri); 301 if (compress) { 302 if ((job.getCompressionMask() & 303 Repository.COMPRESSION_METHOD_GZIP) > 0) { 304 request.addHeader(HEADER_ACCEPT_ENCODING, "gzip"); 305 } 306 if ((job.getCompressionMask() & 307 Repository.COMPRESSION_METHOD_DEFLATE) > 0) { 308 request.addHeader(HEADER_ACCEPT_ENCODING, "deflate"); 309 } 310 request.addHeader(HEADER_ACCEPT_ENCODING, "identity"); 311 } 312 request.addHeader(HEADER_CONNECTION, "close"); 313 // FIXME: get timeout values from somewhere? 314 HttpParams params = request.getParams(); 315 params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000); 316 params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000); 317 318 final HttpResponse response = client.execute(request); 319 long then = System.currentTimeMillis(); 320 job.addToNetworkTime(then - now); 321 int status = response.getStatusLine().getStatusCode(); 322 if (status == HttpStatus.SC_OK) { 323 return new Response(job, response.getEntity(), harvester); 324 } else { 325 request.abort(); 326 if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) { 327 if (retries > 0) { 328 System.err.println("UNAVAILABLE, wait & retry, " + 329 (retries - 1) + " left ..."); 330 try { 331 final long sleep = parseTryAfter(response); 332 System.err.println("-> sleep " + sleep); 333 Thread.sleep(sleep); 334 now = System.currentTimeMillis(); 335 job.addToWaitTime(now - then); 336 } catch (InterruptedException e) { 337 /* IGNORE */ 338 } 339 continue; 340 } else { 341 throw new HarvesterException("too many retries"); 342 } 343 } else { 344 throw new HarvesterException( 345 "provider returned unexpected HTTP status: " + 346 status); 347 } 348 } 349 } while (--retries > 0); 350 throw new HarvesterException("to many retries"); 380 job.incRequestCount(); 381 final URI uri = uriBuilder.createURI(job.getRepositoryURI()); 382 System.err.println("HTTP GET " + uri); 383 final HttpGet request = new HttpGet(uri); 384 if (compress) { 385 if ((job.getCompressionMask() & 386 Repository.COMPRESSION_METHOD_GZIP) > 0) { 387 request.addHeader(HEADER_ACCEPT_ENCODING, "gzip"); 388 } 389 if ((job.getCompressionMask() & 390 Repository.COMPRESSION_METHOD_DEFLATE) > 0) { 391 request.addHeader(HEADER_ACCEPT_ENCODING, "deflate"); 392 } 393 request.addHeader(HEADER_ACCEPT_ENCODING, "identity"); 394 } 395 request.addHeader(HEADER_CONNECTION, "close"); 396 // FIXME: get timeout values from somewhere? 397 HttpParams params = request.getParams(); 398 params.setParameter(CoreConnectionPNames.SO_TIMEOUT, 180000); 399 params.setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000); 400 401 final HttpResponse response = client.execute(request); 402 job.addToNetworkTime(System.currentTimeMillis() - now); 403 int status = response.getStatusLine().getStatusCode(); 404 if (status == HttpStatus.SC_OK) { 405 delay = 0; 406 return new Response(job, response.getEntity(), harvester); 407 } else { 408 request.abort(); 409 if (status == HttpStatus.SC_SERVICE_UNAVAILABLE) { 410 System.err.println("-> " + status); 411 delay = parseTryAfter(response); 412 } else { 413 throw new HarvesterException( 414 "provider returned unexpected HTTP status: " + status); 415 } 416 } 417 return null; 351 418 } 352 419 -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/Response.java
r1134 r1137 66 66 private static final String OAI_NS = 67 67 "http://www.openarchives.org/OAI/2.0/"; 68 private final HarvestJobImpl job;69 68 private final HttpEntity entity; 70 69 private final long now = System.currentTimeMillis(); … … 74 73 Response(HarvestJobImpl job, HttpEntity entity, AbstractHarvester harvester) 75 74 throws IOException, XMLStreamException, HarvesterException { 76 this.job = job;77 75 this.entity = entity; 78 76 … … 122 120 } 123 121 124 public void release( ) {125 job. addToProcessingTime(System.currentTimeMillis() - now);126 job.addToBytesTransferred(stream.getByteCount());122 public void release(HarvestJobImpl job) { 123 job.finishRequest(stream.getByteCount(), 124 (System.currentTimeMillis() - now)); 127 125 try { 128 126 reader.close(); -
OAIHarvester/trunk/OAIHarvester/src/main/java/eu/clarin/cmdi/oai/harvester/impl/SimpleHarvester.java
r1128 r1137 2 2 3 3 import java.net.URI; 4 import java.util.concurrent.TimeUnit; 4 5 5 6 import eu.clarin.cmdi.oai.harvester.HarvestHandler; … … 21 22 throw new IllegalArgumentException("repositoryURI == null"); 22 23 } 23 String jobId;24 long jobId; 24 25 synchronized (this) { 25 jobId = "0x" + Long.toHexString(++nextId);26 jobId = ++nextId; 26 27 } // synchronized 27 28 return new HarvestJobImpl(this, jobId, repositoryURI, handler); … … 30 31 public static Harvester newInstance() { 31 32 return new SimpleHarvester(); 33 } 34 35 @Override 36 public void shutdown() { 32 37 } 33 38 … … 40 45 41 46 job.setState(HarvestJob.State.RUNNING); 42 boolean done = false; 43 while (!done) { 44 done = job.process(worker); 47 48 while (job.isRunning()) { 49 worker.process(job, 15); 50 long delay = -1; 51 while (job.isRunning() && 52 ((delay = job.getDelay(TimeUnit.MILLISECONDS)) > 0)) { 53 try { 54 System.err.println(job.getId() + ": (" + 55 job.getNetworkRequestCount() + ") waiting " + 56 delay + " ..."); 57 Thread.sleep(delay); 58 } catch (InterruptedException e) { 59 } 60 delay = job.getDelay(TimeUnit.MILLISECONDS); 61 } // while (wait loop) 45 62 } // while 46 63 } -
OAIHarvester/trunk/OAIHarvester/src/test/java/eu/clarin/cmdi/oai/harvester/HarvesterTest.java
r1134 r1137 18 18 import javax.xml.stream.XMLStreamWriter; 19 19 20 import eu.clarin.cmdi.oai.harvester.HarvestHandlerAdapter;21 20 import eu.clarin.cmdi.oai.harvester.impl.SimpleHarvester; 22 21 import eu.clarin.cmdi.oai.harvester.util.XMLStreamCopier; … … 31 30 private int reqNum; 32 31 private String prefix; 32 private byte[] buffer = new byte[8192]; 33 33 34 34 public MyHarvestHandler(File file) { … … 161 161 ZipEntry entry = new ZipEntry(filename); 162 162 output.putNextEntry(entry); 163 byte[] buffer = new byte[8192];164 163 int r = -1; 165 164 do { … … 188 187 final String repos = args[0]; 189 188 MyHarvestHandler handler = new MyHarvestHandler(new File(args[1])); 189 190 Harvester harvester = SimpleHarvester.newInstance(); 190 191 try { 191 Harvester harvester = SimpleHarvester.newInstance();192 192 HarvestJob job = harvester.createJob(repos, handler); 193 193 // Calendar cal = Calendar.getInstance(TimeZone.getDefault()); … … 197 197 job.setMetadataPrefixes(Arrays.asList("oai_dc")); 198 198 job.run(); 199 200 while (job.isRunning()) { 201 try { 202 Thread.sleep(1500); 203 } catch (InterruptedException e) { 204 } 205 } 199 206 200 207 System.err.println("==> " + job.getState() + ": "+ … … 219 226 } finally { 220 227 handler.close(); 228 harvester.shutdown(); 221 229 } 222 230 }
Note: See TracChangeset
for help on using the changeset viewer.