NIO and SQS(2)Connect with SQS and SOLR
祝锐
2023-12-01
NIO and SQS(2)Connect with SQS and SOLR
Major Class ExecutorApp.java
package com.sillycat.jobssolrconsumer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.sillycat.jobssolrconsumer.service.MessageService;
import com.sillycat.jobssolrconsumer.service.MessageServiceImpl;
import com.sillycat.jobssolrconsumer.util.MessageUtil;
import io.ino.solrs.JavaAsyncSolrClient;
public class ExecutorApp {
private static final String queueURL = "https://sqs.us-east-1.amazonaws.com/xxxx/prod-solranalytics";
private static final String solrURL = "http://localhost:8983/solr/job";
private static final int PARALLELISM = 10;
private static final int MESSAGES_PER_REQUEST = 10;
private static final Logger logger = LoggerFactory.getLogger(ExecutorApp.class);
private static final long STARTTIME = System.currentTimeMillis();
private static MessageService messageService = new MessageServiceImpl();
public static void main(String[] args) throws InterruptedException, ExecutionException {
logger.info("Jobs SOLR7 Consumer start to run here!");
logger.info("-----------init the connection to sqs/solr start------------------");
AmazonSQSAsync sqsClient = AmazonSQSAsyncClient.asyncBuilder().withRegion(Regions.US_EAST_1).build();
JavaAsyncSolrClient solrClient = JavaAsyncSolrClient.create(solrURL);
logger.info("-----------init the connection to sqs/solr end ------------------");
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest().withQueueUrl(queueURL)
.withMaxNumberOfMessages(MESSAGES_PER_REQUEST);
int messageCount = 0;
IntStream.range(1, PARALLELISM).forEach((i) -> {
processMessages(sqsClient, solrClient, receiveMessageRequest, "worker-" + i, messageCount);
});
}
private static CompletableFuture<Integer> processMessages(AmazonSQSAsync sqsClient, JavaAsyncSolrClient solrClient,
ReceiveMessageRequest receiveMessageRequest, String worker, final Integer messageCount) {
SqsFutureReceiver<ReceiveMessageRequest, ReceiveMessageResult> processSqsMessageFuture = new SqsFutureReceiver<>();
// send out the requests
sqsClient.receiveMessageAsync(receiveMessageRequest, processSqsMessageFuture);
// compose the CompletableFuture
return processSqsMessageFuture.thenCompose(
(receiveMessageResult) -> addSqsJobsToSolr(solrClient, worker, messageCount, receiveMessageResult))
.whenComplete((o, ex) -> {
if (ex != null)
logger.error("Unable process SQS messages or write to Solr");
}).thenCompose((messagesAndSolrResponse) -> deleteSuccessfullyProcessedMessagesFromSqs(sqsClient,
messagesAndSolrResponse))
.whenComplete((o, ex) -> {
if (ex != null)
logger.error("Unable to delete successfully processed SQS messages");
}).thenApply((deleteResults) -> processSqsDeleteResults(messageCount, deleteResults))
.exceptionally((ex) -> {
logger.error("Unable to process SQS job messages", ex);
return messageCount;
}).thenCompose((messagesProcessed) -> processMessages(sqsClient, solrClient, receiveMessageRequest,
worker, messagesProcessed));
}
private static CompletionStage<SqsMessagesSolrResponse> addSqsJobsToSolr(JavaAsyncSolrClient solrClient,
String worker, final Integer messageCount, ReceiveMessageResult receiveMessageResult) {
logger.trace("Process the SQS message contents");
final List<Message> messages = receiveMessageResult.getMessages();
List<SolrInputDocument> solrJobs = convertSqsMessageToSolrJob(worker, messageCount, messages);
logger.trace("Send to Solr");
// return solrClient.addDocs(solrJobs);
CompletionStage<UpdateResponse> solrResponse = solrClient.addDocs(solrJobs);
return solrResponse.thenApply((updateResponse) -> new SqsMessagesSolrResponse(messages, updateResponse));
}
private static Integer processSqsDeleteResults(final int messageCount, DeleteMessageBatchResult deleteResults) {
if (deleteResults.getFailed().size() != 0) {
logger.error("Unable to process {} messages", deleteResults.getFailed().size());
}
return deleteResults.getSuccessful().size() + messageCount;
}
private static CompletionStage<DeleteMessageBatchResult> deleteSuccessfullyProcessedMessagesFromSqs(
AmazonSQSAsync sqsClient, SqsMessagesSolrResponse messagesAndSolrResponse) {
if (messagesAndSolrResponse.getUpdateResponse().getStatus() != 0) {
throw new RuntimeException("Error response status of “ + messagesAndSolrResponse.getUpdateResponse().getStatus() + " from SOLR server");
}
List<Message> messages = messagesAndSolrResponse.getMessages();
logger.trace("Return future of Solr success with original SQS Messages");
// TODO consider always deleting messages that fail to process
// TODO delete all messages that were processed
List<DeleteMessageBatchRequestEntry> deleteEntries = messages.stream()
.map((m) -> new DeleteMessageBatchRequestEntry().withId(m.getReceiptHandle())
.withReceiptHandle(m.getReceiptHandle()))
.collect(Collectors.toList());
SqsFutureReceiver<DeleteMessageBatchRequest, DeleteMessageBatchResult> deleteFuture = new SqsFutureReceiver<>();
sqsClient.deleteMessageBatchAsync(
new DeleteMessageBatchRequest().withQueueUrl(queueURL).withEntries(deleteEntries), deleteFuture);
return deleteFuture;
}
private static List<SolrInputDocument> convertSqsMessageToSolrJob(String worker, final int messageCount,
final List<Message> messages) {
// logging
int nextMessageCount = messageCount + messages.size();
if (nextMessageCount % 100 == 0) {
logger.info(worker + " message count " + nextMessageCount);
long currentTime = System.currentTimeMillis();
long duration = (currentTime - STARTTIME) / 1000;
logger.info(nextMessageCount * 10 / duration + " messages/second");
}
logger.trace("Create Solr request");
List<SolrInputDocument> solrJobs = messages.stream().flatMap(sqsMessage -> {
// uncompress the messages
String snsMsg = sqsMessage.getBody();
List<Map<String, Object>> maps = MessageUtil.decodeMessages(snsMsg);
return maps.stream().map(map -> {
return messageService.convertHashMap2SolrInputDocument(map);
});
}).collect(Collectors.toList());
return solrJobs;
}
public static class SqsMessagesSolrResponse {
private final List<Message> messages;
private final UpdateResponse updateResponse;
public SqsMessagesSolrResponse(List<Message> messages, UpdateResponse updateResponse) {
super();
this.messages = messages;
this.updateResponse = updateResponse;
}
public List<Message> getMessages() {
return messages;
}
public UpdateResponse getUpdateResponse() {
return updateResponse;
}
}
}
Helper Class
package com.sillycat.jobssolrconsumer;
import java.util.concurrent.CompletableFuture;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.AsyncHandler;
public class SqsFutureReceiver<REQUEST extends AmazonWebServiceRequest, RESULT> extends CompletableFuture<RESULT>
implements AsyncHandler<REQUEST, RESULT> {
@Override
public void onSuccess(REQUEST request, RESULT response) {
// TODO complete with request and response
complete(response);
}
@Override
public void onError(Exception ex) {
completeExceptionally(ex);
}
}
References: