source: SRUAggregator/trunk/src/main/java/eu/clarin/sru/fcs/aggregator/search/Search.java @ 6131

Last change on this file since 6131 was 6131, checked in by emanuel.dima@uni-tuebingen.de, 9 years ago
  1. beta-33: support for slow endpoints; POST external searches; UI errors for corner cases
File size: 5.7 KB
Line 
1package eu.clarin.sru.fcs.aggregator.search;
2
3import eu.clarin.sru.client.SRUVersion;
4import java.util.List;
5import eu.clarin.sru.client.SRUClientException;
6import eu.clarin.sru.client.SRUSearchRetrieveRequest;
7import eu.clarin.sru.client.SRUSearchRetrieveResponse;
8import eu.clarin.sru.client.fcs.ClarinFCSRecordData;
9import eu.clarin.sru.fcs.aggregator.client.ThrottledClient;
10import eu.clarin.sru.fcs.aggregator.scan.Corpus;
11import eu.clarin.sru.fcs.aggregator.scan.Diagnostic;
12import eu.clarin.sru.fcs.aggregator.scan.FCSProtocolVersion;
13import eu.clarin.sru.fcs.aggregator.scan.Statistics;
14import eu.clarin.sru.fcs.aggregator.util.SRUCQL;
15import java.util.ArrayList;
16import java.util.Collections;
17import java.util.Random;
18import java.util.concurrent.atomic.AtomicLong;
19import org.slf4j.LoggerFactory;
20
21/**
22 * Class representing a search operation
23 *
24 * @author Yana Panchenko
25 * @author edima
26 */
27public class Search {
28
29        private static final org.slf4j.Logger log = LoggerFactory.getLogger(Search.class);
30
31        private static final String SEARCH_RESULTS_ENCODING = "UTF-8";
32
33        private static final AtomicLong counter = new AtomicLong(Math.abs(new Random().nextInt()));
34
35        private final ThrottledClient searchClient;
36        private final SRUVersion version;
37        private final Long id;
38        private final String query;
39        private final long createdAt = System.currentTimeMillis();
40        private final String searchLanguage;
41        private final List<Result> results = Collections.synchronizedList(new ArrayList<Result>());
42        private final Statistics statistics;
43
44        public Search(ThrottledClient searchClient,
45                        SRUVersion version,
46                        Statistics statistics, List<Corpus> corpora, String searchString,
47                        String searchLanguage, int maxRecords
48        ) {
49                this.searchClient = searchClient;
50                this.version = version;
51                this.id = counter.getAndIncrement();
52                this.query = searchString;
53                this.searchLanguage = searchLanguage;
54                this.statistics = statistics;
55                for (Corpus corpus : corpora) {
56                        Result result = new Result(corpus);
57                        executeSearch(result, query, 1, maxRecords);
58                        results.add(result);
59                }
60        }
61
62        public boolean searchForNextResults(String corpusId, int maxRecords) {
63                for (Result r : results) {
64                        if (r.getCorpus().getId().equals(corpusId)) {
65                                executeSearch(r, query, r.getEndpointReturnedRecords() + 1, maxRecords);
66                                return true;
67                        }
68                }
69                return false;
70        }
71
72        private void executeSearch(final Result result, String searchString,
73                        int startRecord, int maxRecords) {
74                final Corpus corpus = result.getCorpus();
75                log.info("Executing search in '{}' query='{}' maxRecords='{}'",
76                                corpus, searchString, maxRecords);
77
78                SRUSearchRetrieveRequest searchRequest = new SRUSearchRetrieveRequest(corpus.getEndpoint().getUrl());
79                searchRequest.setVersion(version);
80                searchRequest.setStartRecord(startRecord);
81                searchRequest.setMaximumRecords(maxRecords);
82                boolean legacy = corpus.getEndpoint().getProtocol().equals(FCSProtocolVersion.LEGACY);
83                searchRequest.setRecordSchema(legacy
84                                ? ClarinFCSRecordData.LEGACY_RECORD_SCHEMA
85                                : ClarinFCSRecordData.RECORD_SCHEMA);
86                searchRequest.setQuery(searchString);
87                if (corpus.getHandle() != null) {
88                        searchRequest.setExtraRequestData(legacy
89                                        ? SRUCQL.SEARCH_CORPUS_HANDLE_LEGACY_PARAMETER
90                                        : SRUCQL.SEARCH_CORPUS_HANDLE_PARAMETER,
91                                        corpus.getHandle());
92                }
93
94                statistics.initEndpoint(corpus.getInstitution(), corpus.getEndpoint(),
95                                searchClient.getMaxConcurrentRequests(true, corpus.getEndpoint().getUrl()));
96                result.setInProgress(true);
97
98                try {
99                        searchClient.searchRetrieve(searchRequest, new ThrottledClient.SearchCallback() {
100                                @Override
101                                public void onSuccess(SRUSearchRetrieveResponse response, ThrottledClient.Stats stats) {
102                                        try {
103                                                statistics.addEndpointDatapoint(corpus.getInstitution(), corpus.getEndpoint(), stats.getQueueTime(), stats.getExecutionTime());
104                                                result.addResponse(response);
105                                                List<Diagnostic> diagnostics = result.getDiagnostics();
106                                                if (diagnostics != null && !diagnostics.isEmpty()) {
107                                                        log.error("diagnostic for url: " + response.getRequest().getRequestedURI().toString());
108                                                        for (Diagnostic diagnostic : diagnostics) {
109                                                                statistics.addEndpointDiagnostic(corpus.getInstitution(), corpus.getEndpoint(),
110                                                                                diagnostic, response.getRequest().getRequestedURI().toString());
111                                                        }
112                                                }
113                                        } catch (Throwable xc) {
114                                                log.error("search.onSuccess exception:", xc);
115                                        } finally {
116                                                result.setInProgress(false);
117                                        }
118                                }
119
120                                @Override
121                                public void onError(SRUSearchRetrieveRequest srureq, SRUClientException xc, ThrottledClient.Stats stats) {
122                                        try {
123                                                statistics.addEndpointDatapoint(corpus.getInstitution(), corpus.getEndpoint(), stats.getQueueTime(), stats.getExecutionTime());
124                                                statistics.addErrorDatapoint(corpus.getInstitution(), corpus.getEndpoint(), xc, srureq.getRequestedURI().toString());
125                                                result.setException(xc);
126                                                log.error("search.onError: ", xc);
127                                        } catch (Throwable xxc) {
128                                                log.error("search.onError exception:", xxc);
129                                        } finally {
130                                                result.setInProgress(false);
131                                        }
132                                }
133                        });
134                } catch (Throwable xc) {
135                        log.error("SearchRetrieve error for " + corpus.getEndpoint().getUrl(), xc);
136                }
137        }
138
139        public Long getId() {
140                return id;
141        }
142
143        public List<Result> getResults(String corpusId) {
144                List<Result> copy = new ArrayList<>();
145                synchronized (results) {
146                        if (corpusId == null || corpusId.isEmpty()) {
147                                copy.addAll(results);
148                        } else {
149                                for (Result r : results) {
150                                        if (corpusId.equals(r.getCorpus().getId())) {
151                                                copy.add(r);
152                                        }
153                                }
154                        }
155                }
156                return copy;
157        }
158
159        public Statistics getStatistics() {
160                return statistics;
161        }
162
163        public void shutdown() {
164                // nothing to do
165        }
166
167        public long getCreatedAt() {
168                return createdAt;
169        }
170
171        public String getQuery() {
172                return query;
173        }
174
175        public String getSearchLanguage() {
176                return searchLanguage;
177        }
178
179}
Note: See TracBrowser for help on using the repository browser.