diff --git a/src/main/clojure/cemerick/bandalore.clj b/src/main/clojure/cemerick/bandalore.clj index 23272db..3e13561 100644 --- a/src/main/clojure/cemerick/bandalore.clj +++ b/src/main/clojure/cemerick/bandalore.clj @@ -7,13 +7,15 @@ ; You must not remove this notice, or any other, from this software. (ns cemerick.bandalore - (:import com.amazonaws.services.sqs.AmazonSQSClient + (:import (com.amazonaws.services.sqs AmazonSQS AmazonSQSClient AmazonSQSAsyncClient) (com.amazonaws.services.sqs.model AddPermissionRequest ChangeMessageVisibilityRequest CreateQueueRequest DeleteMessageRequest DeleteQueueRequest GetQueueAttributesRequest ListQueuesRequest Message ReceiveMessageRequest ReceiveMessageResult RemovePermissionRequest SendMessageRequest SendMessageResult - SetQueueAttributesRequest)) + SetQueueAttributesRequest) + com.amazonaws.auth.BasicAWSCredentials + (com.amazonaws.services.sqs.buffered AmazonSQSBufferedAsyncClient QueueBufferConfig)) (:refer-clojure :exclude (send))) (defn create-client @@ -27,15 +29,28 @@ ([id secret-key] (create-client id secret-key (com.amazonaws.ClientConfiguration.))) ([id secret-key client-config] - (AmazonSQSClient. (com.amazonaws.auth.BasicAWSCredentials. id secret-key) + (AmazonSQSClient. (BasicAWSCredentials. id secret-key) (.withUserAgent client-config "Bandalore - SQS for Clojure")))) +(defn create-async-client + "Creates a asynchronous AmazonSQSClient using the provided account id and secret key." + ([id secret-key] + (AmazonSQSAsyncClient. (BasicAWSCredentials. id secret-key)))) + +(defn create-buffered-async-client + "Creates a buffered asynchronous AmazonSQSClient using the provided account id, secret key, + and optional com.amazonaws.services.sqs.buffered.QueueBufferConfig." + ([id secret-key] + (create-buffered-async-client id secret-key (QueueBufferConfig.))) + ([id secret-key client-config] + (AmazonSQSBufferedAsyncClient. (create-async-client id secret-key) client-config))) + (def ^{:private true} visibility-warned? (atom false)) (defn create-queue "Creates a queue with the given name, returning the corresponding URL string. Returns successfully if the queue already exists." - [^AmazonSQSClient client queue-name & {:as options}] + [^AmazonSQS client queue-name & {:as options}] (when (and (:visibility options) (not @visibility-warned?)) (println "[WARNING] :visibility option to cemerick.bandalore/create-queue no longer supported;") (println "[WARNING] See https://github.com/cemerick/bandalore/issues/3") @@ -46,12 +61,12 @@ (defn delete-queue "Deletes the queue specified by the given URL string." - [^AmazonSQSClient client queue-url] + [^AmazonSQS client queue-url] (.deleteQueue client (DeleteQueueRequest. queue-url))) (defn list-queues "Returns a seq of all queues' URL strings." - [^AmazonSQSClient client] + [^AmazonSQS client] (->> (ListQueuesRequest.) (.listQueues client) .getQueueUrls @@ -64,13 +79,13 @@ and values. Note that the SQS API only supports setting one queue attribute per request, so each attribute name/value pair in the provided `attr-map` will provoke a separate API request." - ([^AmazonSQSClient client queue-url] + ([^AmazonSQS client queue-url] (->> (.withAttributeNames (GetQueueAttributesRequest. queue-url) #{"All"}) (.getQueueAttributes client) .getAttributes (into {}))) - ([^AmazonSQSClient client queue-url attr-map] + ([^AmazonSQS client queue-url attr-map] (doseq [[k v] attr-map] (.setQueueAttributes client (SetQueueAttributesRequest. queue-url {(str k) (str v)}))))) @@ -78,7 +93,7 @@ (defn send "Sends a new message with the given string body to the queue specified by the string URL. Returns a map with :id and :body-md5 slots." - [^AmazonSQSClient client queue-url message] + [^AmazonSQS client queue-url message] (let [resp (.sendMessage client (SendMessageRequest. queue-url message))] {:id (.getMessageId resp) :body-md5 (.getMD5OfMessageBody resp)})) @@ -115,7 +130,7 @@ :receipt-handle - the ID used to delete the message from the queue after it has been fully processed. :source-queue - the URL of the queue from which the message was received" - [^AmazonSQSClient client queue-url & {:keys [limit visibility ^java.util.Collection attributes] + [^AmazonSQS client queue-url & {:keys [limit visibility ^java.util.Collection attributes] :or {limit 1 attributes #{}}}] (let [req (-> (ReceiveMessageRequest. queue-url) @@ -168,9 +183,9 @@ if the `queue-url` is not provided explicitly. Otherwise, `message` may be a message map, Message object, or the :receipt-handle value from a received message map." - ([^AmazonSQSClient client message] + ([^AmazonSQS client message] (delete client (:source-queue message) message)) - ([^AmazonSQSClient client queue-url message] + ([^AmazonSQS client queue-url message] (->> message receipt-handle (DeleteMessageRequest. queue-url) @@ -194,7 +209,7 @@ `message` may be a message map, Message object, or the :receipt-handle value from a received message map." - [^AmazonSQSClient client queue-url message visibility-timeout] + [^AmazonSQS client queue-url message visibility-timeout] (.changeMessageVisibility client (ChangeMessageVisibilityRequest. queue-url (receipt-handle message) visibility-timeout)))