Changeset 5901 for SRUAggregator
- Timestamp:
- 12/11/14 15:58:31 (9 years ago)
- Location:
- SRUAggregator/trunk
- Files:
-
- 11 edited
Legend:
- Unmodified
- Added
- Removed
-
SRUAggregator/trunk/aggregator_development.yml
r5900 r5901 1 1 aggregatorParams: 2 CENTER_REGISTRY_URL: http://cent erregistry-clarin.esc.rzg.mpg.de/restxml/2 CENTER_REGISTRY_URL: http://centres.clarin.eu/restxml/ 3 3 WEBLICHT_URL: https://weblicht.sfs.uni-tuebingen.de/WebLicht-4/?input= 4 4 5 5 AGGREGATOR_FILE_PATH: /Users/edima/fcsAggregatorCorpora.json 6 SCAN_MAX_DEPTH: 16 SCAN_MAX_DEPTH: 3 7 7 SCAN_TASK_INITIAL_DELAY: 0 8 SCAN_TASK_INTERVAL: 68 SCAN_TASK_INTERVAL: 24 9 9 SCAN_TASK_TIME_UNIT: HOURS 10 10 11 ENDPOINTS_SCAN_TIMEOUT_MS: 60000 12 ENDPOINTS_SEARCH_TIMEOUT_MS: 5000 11 SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT: 4 # @depth2: 1=361s; 2=225s; 4=207s 12 SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT: 4 13 14 15 ENDPOINTS_SCAN_TIMEOUT_MS: 10000 16 ENDPOINTS_SEARCH_TIMEOUT_MS: 10000 13 17 EXECUTOR_SHUTDOWN_TIMEOUT_MS: 100 14 18 -
SRUAggregator/trunk/pom.xml
r5900 r5901 8 8 <groupId>eu.clarin.sru.fcs</groupId> 9 9 <artifactId>Aggregator2</artifactId> 10 <version>2.0.0-alpha- 9</version>10 <version>2.0.0-alpha-10</version> 11 11 <name>FCS Aggregator</name> 12 12 -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/Aggregator.java
r5900 r5901 80 80 * @author edima 81 81 * 82 * TODO: version page: credits, open source, see vcr/version page 83 * 84 * TODO: change the order in the GUI: selected collections after the 85 * language/layer 86 * 87 * TODO: condensed list of corpora 88 * 89 * TODO: group the list of corpora by institution? 90 * 91 * TODO: fix ordering of corpora in corpora view 92 * 93 * TODO: corpora search should not indicate the ones that don't match 94 * 95 * TODO: try to refine by language using a language library, with UI element 96 * 97 * TODO: helpdesk: switch to english (parameter of the form) 98 * 99 * TODO: label: "phonetic transcriptions" (ask the BAS guys) 100 * 101 * TODO: number of results control: make the buttons larger 102 * 103 * TODO: Collections view: home link (make a single consistent text for it) 104 * 82 105 * TODO: push footer down 83 106 * … … 107 130 * 108 131 * TODO: show multiple hits on the same result in multiple rows, linked visually 132 * 133 * TODO: optimise page load 109 134 * 110 135 * TODO: improve help page text … … 190 215 log.info("Aggregator initialization started."); 191 216 sruScanClient = new ThrottledClient( 192 217 new ClarinFCSClientBuilder() 193 218 .setConnectTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS) 194 219 .setSocketTimeout(params.ENDPOINTS_SCAN_TIMEOUT_MS) 195 220 .addDefaultDataViewParsers() 196 221 .enableLegacySupport() 197 .buildThreadedClient()); 222 .buildThreadedClient(), 223 params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT, 224 params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT 225 ); 198 226 sruSearchClient = new ThrottledClient( 199 227 new ClarinFCSClientBuilder() … … 202 230 .addDefaultDataViewParsers() 203 231 .enableLegacySupport() 204 .buildThreadedClient()); 232 .buildThreadedClient(), 233 params.SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT, 234 params.SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT 235 ); 205 236 206 237 File corporaCacheFile = new File(params.AGGREGATOR_FILE_PATH); -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/app/AggregatorConfiguration.java
r5900 r5901 42 42 @JsonProperty 43 43 @Range 44 int SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT; 45 46 @JsonProperty 47 @Range 48 int SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT; 49 50 @JsonProperty 51 @Range 44 52 int ENDPOINTS_SCAN_TIMEOUT_MS; 45 53 … … 66 74 return ENDPOINTS_SEARCH_TIMEOUT_MS; 67 75 } 76 77 @JsonIgnore 78 public int getSCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT() { 79 return SCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT; 80 } 81 82 @JsonIgnore 83 public int getSEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT() { 84 return SEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT; 85 } 68 86 } 69 70 87 public Params aggregatorParams = new Params(); 71 88 } -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/client/GenericClient.java
r5893 r5901 14 14 class GenericClient { 15 15 private final SRUThreadedClient sruClient; 16 public final int maxConcurrentRequests; 16 17 // queue of operations waiting for execution 17 private final Map<URI, Queue<Operation>> endpoint2exec = new HashMap<URI, Queue<Operation>>(); 18 private final int nowExecuting = 0; 18 19 static class ExecQueue { 20 21 int nowExecuting = 0; 22 Queue<Operation> queue = new ArrayDeque<>(); 23 } 24 private final Map<URI, ExecQueue> endpointMap = new HashMap<URI, ExecQueue>(); 19 25 private final Object lock = new Object(); 20 26 21 GenericClient(SRUThreadedClient sruClient ) {27 GenericClient(SRUThreadedClient sruClient, int maxConcurrentRequests) { 22 28 this.sruClient = sruClient; 29 this.maxConcurrentRequests = maxConcurrentRequests; 23 30 } 24 31 … … 26 33 op.setClient(this); 27 34 synchronized (lock) { 28 if (!endpoint2exec.containsKey(endpoint)) { 29 endpoint2exec.put(endpoint, new ArrayDeque<Operation>()); 35 ExecQueue eq = endpointMap.get(endpoint); 36 if (eq == null) { 37 eq = new ExecQueue(); 38 endpointMap.put(endpoint, eq); 30 39 } 31 40 op.stats().enqueuedTime = System.currentTimeMillis(); 32 endpoint2exec.get(endpoint).add(op); 41 eq.queue.add(op); 42 eq.nowExecuting++; // counter the following decrement in executeNext 33 43 executeNextOperationOfEndpoint(endpoint); 34 44 } … … 37 47 void executeNextOperationOfEndpoint(URI endpoint) { 38 48 synchronized (lock) { 39 Queue<Operation> queue = endpoint2exec.get(endpoint); 40 if (queue == null || queue.isEmpty()) { 49 ExecQueue eq = endpointMap.get(endpoint); 50 eq.nowExecuting--; // assume an operation just finished 51 if (eq.queue.isEmpty()) { 41 52 return; 42 53 } 43 if ( nowExecuting >= ThrottledClient.MAX_CONCURRENT_REQUESTS) {54 if (eq.nowExecuting >= maxConcurrentRequests) { 44 55 return; 45 56 } 46 Operation op = queue.poll(); 57 eq.nowExecuting++; 58 Operation op = eq.queue.poll(); 47 59 op.stats().startedTime = System.currentTimeMillis(); 48 60 op.execute(sruClient); -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/client/ScanOperation.java
r5894 r5901 33 33 sruClient.scan(request, this); 34 34 } catch (SRUClientException xc) { 35 stats.finishedTime = System.currentTimeMillis(); 36 callback.onError(request, xc, stats); 35 onError(request, xc); 37 36 } 38 37 } -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/client/ThrottledClient.java
r5893 r5901 12 12 */ 13 13 public class ThrottledClient { 14 15 public final static int MAX_CONCURRENT_REQUESTS = 8;16 14 17 15 public interface Stats { … … 40 38 GenericClient searchClient; 41 39 42 public ThrottledClient(SRUThreadedClient sruClient ) {40 public ThrottledClient(SRUThreadedClient sruClient, int maxConcurrentScanRequests, int maxConcurrentSearchRequests) { 43 41 this.sruClient = sruClient; 44 this.scanClient = new GenericClient(sruClient );45 this.searchClient = new GenericClient(sruClient );42 this.scanClient = new GenericClient(sruClient, maxConcurrentScanRequests); 43 this.searchClient = new GenericClient(sruClient, maxConcurrentSearchRequests); 46 44 } 47 45 -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/rest/RestService.java
r5900 r5901 75 75 put("lastScanStats", new HashMap<String, Object>() { 76 76 { 77 put("maxConcurrentRequestsPerEndpoint", ThrottledClient.MAX_CONCURRENT_REQUESTS); 77 put("maxConcurrentScanRequestsPerEndpoint", 78 Aggregator.getInstance().getParams().getSCAN_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT()); 78 79 put("timeout", params.getENDPOINTS_SCAN_TIMEOUT_MS() / 1000.); 79 80 put("institutions", scan.getInstitutions()); … … 82 83 put("searchStats", new HashMap<String, Object>() { 83 84 { 84 put("maxConcurrentRequestsPerEndpoint", ThrottledClient.MAX_CONCURRENT_REQUESTS); 85 put("maxConcurrentSearchRequestsPerEndpoint", 86 Aggregator.getInstance().getParams().getSEARCH_MAX_CONCURRENT_REQUESTS_PER_ENDPOINT()); 85 87 put("timeout", params.getENDPOINTS_SEARCH_TIMEOUT_MS() / 1000.); 86 88 put("institutions", search.getInstitutions()); -
SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/scan/ScanCrawler.java
r5900 r5901 89 89 } 90 90 91 log.info("{} Start scan: {} ", latch.get(), st.endpointUrl);91 log.info("{} Start scan: {}#{}", latch.get(), st.endpointUrl, normalizeHandle(st.parentCorpus)); 92 92 latch.increment(); 93 93 sruClient.scan(scanRequest, st); … … 144 144 } 145 145 146 log.info("{} Finished scan: {} ", latch.get(), endpointUrl);146 log.info("{} Finished scan: {}#{}", latch.get(), endpointUrl, normalizeHandle(parentCorpus)); 147 147 } catch (Exception xc) { 148 log.error("{} Exception in callback {} ", latch.get(), endpointUrl);148 log.error("{} Exception in callback {}#{}", latch.get(), endpointUrl, normalizeHandle(parentCorpus)); 149 149 log.error("--> ", xc); 150 150 } finally { … … 155 155 @Override 156 156 public void onError(SRUScanRequest request, SRUClientException error, ThrottledClient.Stats stats) { 157 latch.decrement(); 158 log.error("{} Error while scanning {}: {}", latch.get(), endpointUrl, error.getMessage()); 159 statistics.addEndpointDatapoint(institution, endpointUrl, stats.getQueueTime(), stats.getExecutionTime()); 160 statistics.addErrorDatapoint(institution, endpointUrl, error); 161 if (Throw.isCausedBy(error, SocketTimeoutException.class)) { 162 return; 163 } 164 log.error("--> " + request.getBaseURI() + "?" + request.getScanClause() + " --> ", error); 157 try { 158 log.error("{} Error while scanning {}#{}: {}", latch.get(), endpointUrl, normalizeHandle(parentCorpus), error.getMessage()); 159 statistics.addEndpointDatapoint(institution, endpointUrl, stats.getQueueTime(), stats.getExecutionTime()); 160 statistics.addErrorDatapoint(institution, endpointUrl, error); 161 if (Throw.isCausedBy(error, SocketTimeoutException.class)) { 162 return; 163 } 164 log.error("--> " + request.getBaseURI() + "?" + request.getScanClause() + " --> ", error); 165 } finally { 166 latch.decrement(); 167 } 165 168 } 166 169 } -
SRUAggregator/trunk/src/main/resources/assets/index.html
r5900 r5901 24 24 <a title="about" id="aboutlink"> 25 25 <span class="glyphicon glyphicon-info-sign"></span> 26 <span>VERSION 2.0.0.α 9</span>26 <span>VERSION 2.0.0.α10</span> 27 27 </a> 28 28 </div> -
SRUAggregator/trunk/src/test/java/eu/clarin/sru/fcs/aggregator/app/ScanCrawlerTest.java
r5893 r5901 30 30 new ClarinFCSClientBuilder() 31 31 .addDefaultDataViewParsers() 32 .buildThreadedClient() );32 .buildThreadedClient(), 2, 2); 33 33 34 34 try {
Note: See TracChangeset
for help on using the changeset viewer.