Extend to use the com.amazonaws.services.sqs.buffered.AmazonSQSBufferedAsyncClient.

This commit is contained in:
Emil Yessenamanov 2013-11-05 10:04:08 +01:00
parent b7d186125c
commit 371681b03a

View file

@ -7,13 +7,15 @@
; You must not remove this notice, or any other, from this software. ; You must not remove this notice, or any other, from this software.
(ns cemerick.bandalore (ns cemerick.bandalore
(:import com.amazonaws.services.sqs.AmazonSQSClient (:import (com.amazonaws.services.sqs AmazonSQS AmazonSQSClient AmazonSQSAsyncClient)
(com.amazonaws.services.sqs.model (com.amazonaws.services.sqs.model
AddPermissionRequest ChangeMessageVisibilityRequest CreateQueueRequest AddPermissionRequest ChangeMessageVisibilityRequest CreateQueueRequest
DeleteMessageRequest DeleteQueueRequest GetQueueAttributesRequest DeleteMessageRequest DeleteQueueRequest GetQueueAttributesRequest
ListQueuesRequest Message ReceiveMessageRequest ReceiveMessageResult ListQueuesRequest Message ReceiveMessageRequest ReceiveMessageResult
RemovePermissionRequest SendMessageRequest SendMessageResult RemovePermissionRequest SendMessageRequest SendMessageResult
SetQueueAttributesRequest)) SetQueueAttributesRequest)
com.amazonaws.auth.BasicAWSCredentials
(com.amazonaws.services.sqs.buffered AmazonSQSBufferedAsyncClient QueueBufferConfig))
(:refer-clojure :exclude (send))) (:refer-clojure :exclude (send)))
(defn create-client (defn create-client
@ -27,15 +29,28 @@
([id secret-key] ([id secret-key]
(create-client id secret-key (com.amazonaws.ClientConfiguration.))) (create-client id secret-key (com.amazonaws.ClientConfiguration.)))
([id secret-key client-config] ([id secret-key client-config]
(AmazonSQSClient. (com.amazonaws.auth.BasicAWSCredentials. id secret-key) (AmazonSQSClient. (BasicAWSCredentials. id secret-key)
(.withUserAgent client-config "Bandalore - SQS for Clojure")))) (.withUserAgent client-config "Bandalore - SQS for Clojure"))))
(defn create-async-client
"Creates a asynchronous AmazonSQSClient using the provided account id and secret key."
([id secret-key]
(AmazonSQSAsyncClient. (BasicAWSCredentials. id secret-key))))
(defn create-buffered-async-client
"Creates a buffered asynchronous AmazonSQSClient using the provided account id, secret key,
and optional com.amazonaws.services.sqs.buffered.QueueBufferConfig."
([id secret-key]
(create-buffered-async-client id secret-key (QueueBufferConfig.)))
([id secret-key client-config]
(AmazonSQSBufferedAsyncClient. (create-async-client id secret-key) client-config)))
(def ^{:private true} visibility-warned? (atom false)) (def ^{:private true} visibility-warned? (atom false))
(defn create-queue (defn create-queue
"Creates a queue with the given name, returning the corresponding URL string. "Creates a queue with the given name, returning the corresponding URL string.
Returns successfully if the queue already exists." Returns successfully if the queue already exists."
[^AmazonSQSClient client queue-name & {:as options}] [^AmazonSQS client queue-name & {:as options}]
(when (and (:visibility options) (not @visibility-warned?)) (when (and (:visibility options) (not @visibility-warned?))
(println "[WARNING] :visibility option to cemerick.bandalore/create-queue no longer supported;") (println "[WARNING] :visibility option to cemerick.bandalore/create-queue no longer supported;")
(println "[WARNING] See https://github.com/cemerick/bandalore/issues/3") (println "[WARNING] See https://github.com/cemerick/bandalore/issues/3")
@ -46,12 +61,12 @@
(defn delete-queue (defn delete-queue
"Deletes the queue specified by the given URL string." "Deletes the queue specified by the given URL string."
[^AmazonSQSClient client queue-url] [^AmazonSQS client queue-url]
(.deleteQueue client (DeleteQueueRequest. queue-url))) (.deleteQueue client (DeleteQueueRequest. queue-url)))
(defn list-queues (defn list-queues
"Returns a seq of all queues' URL strings." "Returns a seq of all queues' URL strings."
[^AmazonSQSClient client] [^AmazonSQS client]
(->> (ListQueuesRequest.) (->> (ListQueuesRequest.)
(.listQueues client) (.listQueues client)
.getQueueUrls .getQueueUrls
@ -64,13 +79,13 @@
and values. Note that the SQS API only supports setting one queue attribute and values. Note that the SQS API only supports setting one queue attribute
per request, so each attribute name/value pair in the provided `attr-map` per request, so each attribute name/value pair in the provided `attr-map`
will provoke a separate API request." will provoke a separate API request."
([^AmazonSQSClient client queue-url] ([^AmazonSQS client queue-url]
(->> (->>
(.withAttributeNames (GetQueueAttributesRequest. queue-url) #{"All"}) (.withAttributeNames (GetQueueAttributesRequest. queue-url) #{"All"})
(.getQueueAttributes client) (.getQueueAttributes client)
.getAttributes .getAttributes
(into {}))) (into {})))
([^AmazonSQSClient client queue-url attr-map] ([^AmazonSQS client queue-url attr-map]
(doseq [[k v] attr-map] (doseq [[k v] attr-map]
(.setQueueAttributes client (.setQueueAttributes client
(SetQueueAttributesRequest. queue-url {(str k) (str v)}))))) (SetQueueAttributesRequest. queue-url {(str k) (str v)})))))
@ -78,7 +93,7 @@
(defn send (defn send
"Sends a new message with the given string body to the queue specified "Sends a new message with the given string body to the queue specified
by the string URL. Returns a map with :id and :body-md5 slots." by the string URL. Returns a map with :id and :body-md5 slots."
[^AmazonSQSClient client queue-url message] [^AmazonSQS client queue-url message]
(let [resp (.sendMessage client (SendMessageRequest. queue-url message))] (let [resp (.sendMessage client (SendMessageRequest. queue-url message))]
{:id (.getMessageId resp) {:id (.getMessageId resp)
:body-md5 (.getMD5OfMessageBody resp)})) :body-md5 (.getMD5OfMessageBody resp)}))
@ -115,7 +130,7 @@
:receipt-handle - the ID used to delete the message from the queue after :receipt-handle - the ID used to delete the message from the queue after
it has been fully processed. it has been fully processed.
:source-queue - the URL of the queue from which the message was received" :source-queue - the URL of the queue from which the message was received"
[^AmazonSQSClient client queue-url & {:keys [limit visibility ^java.util.Collection attributes] [^AmazonSQS client queue-url & {:keys [limit visibility ^java.util.Collection attributes]
:or {limit 1 :or {limit 1
attributes #{}}}] attributes #{}}}]
(let [req (-> (ReceiveMessageRequest. queue-url) (let [req (-> (ReceiveMessageRequest. queue-url)
@ -168,9 +183,9 @@
if the `queue-url` is not provided explicitly. Otherwise, if the `queue-url` is not provided explicitly. Otherwise,
`message` may be a message map, Message object, `message` may be a message map, Message object,
or the :receipt-handle value from a received message map." or the :receipt-handle value from a received message map."
([^AmazonSQSClient client message] ([^AmazonSQS client message]
(delete client (:source-queue message) message)) (delete client (:source-queue message) message))
([^AmazonSQSClient client queue-url message] ([^AmazonSQS client queue-url message]
(->> message (->> message
receipt-handle receipt-handle
(DeleteMessageRequest. queue-url) (DeleteMessageRequest. queue-url)
@ -194,7 +209,7 @@
`message` may be a message map, Message object, `message` may be a message map, Message object,
or the :receipt-handle value from a received message map." or the :receipt-handle value from a received message map."
[^AmazonSQSClient client queue-url message visibility-timeout] [^AmazonSQS client queue-url message visibility-timeout]
(.changeMessageVisibility client (.changeMessageVisibility client
(ChangeMessageVisibilityRequest. (ChangeMessageVisibilityRequest.
queue-url (receipt-handle message) visibility-timeout))) queue-url (receipt-handle message) visibility-timeout)))