Compare commits

...

35 commits

Author SHA1 Message Date
Chas Emerick
a0fb4273b2 Merge pull request #11 from dacamo76/readme-long-polling-example
Update README with long-polling usage example
2014-09-13 07:30:15 -04:00
Daniel Alberto Cañas
478210ccb2 Update README with long-polling usage example 2014-09-12 16:26:30 -06:00
Chas Emerick
e4a15227f6 [maven-release-plugin] prepare for next development iteration 2014-09-09 08:02:07 -04:00
Chas Emerick
09e7772fd7 [maven-release-plugin] prepare release bandalore-0.0.6 2014-09-09 08:02:07 -04:00
Chas Emerick
42751b891a 0.0.6 2014-09-09 07:57:14 -04:00
Daniel Alberto Cañas
daffed7bcc Minimize Integer object construction
Use Integer.valueOf(int) factory method instead of new Integer(int) constructor
to take advantage of caching.
This is especially useful when creating millions of
ReceiveMessageRequests with the same options.
2014-06-24 21:48:33 -06:00
Daniel Alberto Cañas
ce8c02bdc4 Adds long polling support
Adds long polling support to receive message requests.
The option 'wait-time-seconds' has been added to the 'receive' function.
This option behaves exactly the same as the 'WaitTimeSeconds' parameter
of the ReceiveMessageRequest class in the AWS Java SDK.

The following text is shamelessly taken from the (docs
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html):

Long polling allows the Amazon SQS service to wait until a message is
available in the queue before sending a response. This helps reduce the
amount of empty responses, thus reducing costs.

Another benefit is reducing the false empty reponses, where messages are
available in the queue but are not included in the response.
This happens when Amazon SQS uses short (standard) polling, the default
behavior, where only a subset of the servers (based on a weighted random
distribution) are queried to see if any messages are available to
include in the response. On the other hand, when long polling is
enabled, Amazon SQS queries all of the servers.
2014-06-24 21:18:06 -06:00
Daniel Alberto Cañas
98cb434787 Bumps aws-sdk-java to 1.8.0
Updates version to support long polling.
2014-06-24 16:46:04 -06:00
Chas Emerick
bfb0fca2e7 0.0.5 in README 2013-12-11 07:46:15 -05:00
Chas Emerick
e9f1afbb50 [maven-release-plugin] prepare for next development iteration 2013-12-11 06:55:22 -05:00
Chas Emerick
3d1c859c62 [maven-release-plugin] prepare release bandalore-0.0.5 2013-12-11 06:55:22 -05:00
Tim Chagnon
382fd20418 Make :prefix a keyword argument of list-queues 2013-11-26 10:21:16 -08:00
Tim Chagnon
960c5e4966 Add optional prefix argument to list-queues 2013-11-25 21:07:09 -08:00
Chas Emerick
b7d186125c link to better page on IAM roles for Java devs 2013-10-17 06:52:15 -04:00
Chas Emerick
e31c010c08 [maven-release-plugin] prepare for next development iteration 2013-10-17 06:47:08 -04:00
Chas Emerick
14b7b05b70 [maven-release-plugin] prepare release bandalore-0.0.4 2013-10-17 06:47:07 -04:00
Chas Emerick
e4fe404f72 0.0.4 2013-10-17 06:29:49 -04:00
Chas Emerick
5a84c6d71b github doesn't like HTML in its markdown... 2013-10-17 06:24:27 -04:00
Chas Emerick
a2fc3400bc add doc urging use of IAM roles in production, ref gh-4 2013-10-17 06:19:06 -04:00
Kei Tsuji
21ed5eb278 Adding support for calling AmazonSQSClient. without id/password to support authentication via IAM role.
From EC2 instance with IAM Role, we can now create client like this (create-client).
2013-10-17 13:16:35 +09:00
Chas Emerick
48c5f17ad5 Make it slightly more clear that :max-wait is denominated in milliseconds 2013-02-13 14:04:46 -05:00
Chas Emerick
b76312303a tweak readme for 0.0.3 2012-10-02 08:33:05 -04:00
Chas Emerick
47b44e7555 [maven-release-plugin] prepare for next development iteration 2012-10-02 08:33:05 -04:00
Chas Emerick
49e54854da [maven-release-plugin] prepare release bandalore-0.0.3 2012-10-02 08:33:05 -04:00
Chas Emerick
eb2b3524ee upgrade to latest AWS SDK, tweak APIs usage to suit. 2012-10-02 08:33:05 -04:00
Chas Emerick
9b782e4817 default to Clojure 1.4.0 2012-10-02 08:33:05 -04:00
Chas Emerick
60ed575738 remove last asciidoc link 2012-02-24 11:14:34 -05:00
Chas Emerick
0ecacd49dd [maven-release-plugin] prepare for next development iteration 2012-02-24 11:05:20 -05:00
Chas Emerick
cdc1c59924 [maven-release-plugin] prepare release bandalore-0.0.2 2012-02-24 11:05:19 -05:00
Chas Emerick
112d6121d5 readme asciidoc -> markdown 2012-02-24 11:01:53 -05:00
Chas Emerick
6273806597 integration tests with all supported versions of Clojure 2012-02-24 11:01:53 -05:00
Chas Emerick
d55dbb0590 Clojure 1.3.0 compatibility; use it as the default dependency, fixes gh-1 2012-02-24 11:01:53 -05:00
Chas Emerick
7ae72ff7d9 test tweaks 2012-02-24 11:01:49 -05:00
Chas Emerick
3dfd311a9c fix maven groupId 2011-02-23 07:50:47 -05:00
Chas Emerick
978cfe5945 [maven-release-plugin] prepare for next development iteration 2011-02-23 07:38:57 -05:00
6 changed files with 288 additions and 124 deletions

View file

@ -1,69 +1,84 @@
= Bandalore
# Bandalore
http://github.com/cemerick/bandalore[Bandalore] is a Clojure client
library for Amazon's http://aws.amazon.com/sqs/[Simple Queue Service (SQS)]. It depends upon
the standard http://aws.amazon.com/sdkforjava/[AWS SDK for Java],
[Bandalore](http://github.com/cemerick/bandalore) is a Clojure client
library for Amazon's [Simple Queue Service](http://aws.amazon.com/sqs/). It depends upon
the standard [AWS SDK for Java](http://aws.amazon.com/sdkforjava/),
and provides a Clojure-idiomatic API for the SQS-related functionality
therein.
== "Installation"
## "Installation"
Bandalore is available in Maven central. Add it to your Maven project's `pom.xml`:
----
```xml
<dependency>
<groupId>cemerick</groupId>
<groupId>com.cemerick</groupId>
<artifactId>bandalore</artifactId>
<version>0.0.1</version>
<version>0.0.6</version>
</dependency>
----
```
or your leiningen project.clj:
----
[cemerick/bandalore "0.0.1"]
----
```clojure
[com.cemerick/bandalore "0.0.6"]
```
== Logging
Bandalore is compatible with Clojure 1.2.0+.
## Logging
I strongly recommend squelching the AWS SDK's very verbose logging
before using Bandalore (the former spews a variety of stuff out on
INFO that I personally think should be in DEBUG or TRACE). You can
do this with this snippet:
----
```clojure
(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws")
java.util.logging.Level/WARNING)
----
```
Translate as necessary if you're using log4j, etc.
== Usage
## Usage
You should be familiar with http://aws.amazon.com/sqs/[SQS itself]
You should be familiar with [SQS itself](http://aws.amazon.com/sqs/)
before sensibly using this library. That said, Bandalore's API
is well-documented.
You'll first need to load the library and create a SQS client object
to do anything:
----
```clojure
(require '[cemerick.bandalore :as sqs])
(def client (sqs/create-client "your aws id" "your aws secret-key"))
----
```
**Security Note** If your application using Bandalore is deployed to EC2, _you
should not put your AWS credentials on those EC2 nodes_. Rather,
[give your EC2 instances IAM roles](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-roles.html),
and use the nullary arity of `create-client`:
```clojure
(require '[cemerick.bandalore :as sqs])
(def client (sqs/create-client))
```
This will use credentials assigned to your EC2 node based on its
role that are automatically rotated.
You can create, delete, and list queues:
----
=> (sqs/create-queue client "foo")
```clojure
#> (sqs/create-queue client "foo")
"https://queue.amazonaws.com/499312652346/foo"
=> (sqs/list-queues client)
#> (sqs/list-queues client)
("https://queue.amazonaws.com/499312652346/foo")
=> (sqs/delete-queue client (first *1))
#> (sqs/delete-queue client (first *1))
nil
=> (list-queues client)
#> (list-queues client)
nil
----
```
*Note that SQS is _eventually consistent_. This means that a created
queue won't necessarily show up in an immediate listing of queues,
@ -71,95 +86,117 @@ messages aren't necessarily immediately available to be received, etc.*
You can send, receive, and delete messages:
----
=> (def q (sqs/create-queue client "foo"))
```clojure
#> (def q (sqs/create-queue client "foo"))
#'cemerick.bandalore-test/q
=> (sqs/send client q "my message body")
#> (sqs/send client q "my message body")
{:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee", :body-md5 "05d358de00fc63dd2fa2026b77e112f6"}
=> (sqs/receive client q)
#> (sqs/receive client q)
({:attrs #<HashMap {}>, :body "my message body", :body-md5 "05d358de00fc63dd2fa2026b77e112f6",
:id "75d5d7a1-2274-4163-97b2-aa4c75f209ee",
:receipt-handle "…very long string…"})
;;
;; …presumably do something with the received message(s)…
;;
=> (sqs/delete client q (first *1))
#> (sqs/delete client q (first *1))
nil
=> (sqs/receive client q)
#> (sqs/receive client q)
()
----
```
That's cleaner than having to interop directly with the Java SDK, but it's all
pretty pedestrian stuff. You can do more interesting things with some
simple higher-order functions and other nifty Clojure facilities.
=== Sending and receiving Clojure values
### Enabling SQS Long Polling
[Long polling](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html) reduces the number of empty responses by allowing Amazon SQS service to wait until a message is available in the queue before sending a response. You can enable long polling on an individual receive request by supplying the optional kwarg `:wait-time-seconds`:
:wait-time-seconds - time in seconds (bewteen 0 and 20) for SQS to wait if there are no messages in the queue. A value of 0 indicates no long polling.
```clojure
; ensure our queue is empty to start
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
"0"
#> (let [no-polling (future (sqs/receive client q))
long-polling (future (sqs/receive client q :wait-time-seconds 20))]
(Thread/sleep 10000) ;; Sleep 10s before sending message
(sqs/send client q "my message body")
(println (count @no-polling))
(println (count @long-polling)))
0
1
nil
```
### Sending and receiving Clojure values
SQS' message bodies are strings, so you can stuff anything in them that you can
serialize to a string. That said, `pr-str` and `read-string` are too handy
to not use, assuming your consumers are using Clojure as well:
----
=> (sqs/send client q (pr-str {:a 5 :b "blah" :c 6.022e23}))
```clojure
#> (sqs/send client q (pr-str {:a 5 :b "blah" :c 6.022e23}))
{:id "3756c302-866a-4fcc-a7a3-746e6f531f47", :body-md5 "60052fc2ffb835257c26b9957c6e9ffd"}
=> (-?> (sqs/receive client q) first :body read-string)
#> (-?> (sqs/receive client q) first :body read-string)
{:a 5, :b "blah", :c 6.022E23}
----
```
=== Sending seqs of messages
### Sending seqs of messages
…with more gratuitous use of `pr-str` and `read-string` to send and receive
Clojure values:
----
=> (->> [:foo 'bar ["some vector" 42] #{#"silly place for a regex"}]
```clojure
#> (->> [:foo 'bar ["some vector" 42] #{#"silly place for a regex"}]
(map (comp (partial sqs/send client q) pr-str))
dorun)
nil
=> (map (comp read-string :body)
#> (map (comp read-string :body)
(sqs/receive client q :limit 10))
(bar ["some vector" 42])
=> (map (comp read-string :body)
#> (map (comp read-string :body)
(sqs/receive client q :limit 10))
(#{#"silly place for a regex"})
=> (map (comp read-string :body)
#> (map (comp read-string :body)
(sqs/receive client q :limit 10))
(:foo)
----
```
=== (Mostly) automatic deletion of consumed messages
### (Mostly) automatic deletion of consumed messages
When you're done processing a received message, you need to delete it from its
originaing queue:
----
```clojure
; ensure our queue is empty to start
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
"0"
=> (dorun (map (partial sqs/send client q) (map str (range 100))))
#> (dorun (map (partial sqs/send client q) (map str (range 100))))
nil
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
"100"
; received messages must be removed from the queue or they will
; be delivered again after their visibility timeout expires
=> (sqs/receive client q)
#> (sqs/receive client q)
(…message seq…)
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
"100"
=> (->> (sqs/receive client q) first (sqs/delete client))
#> (->> (sqs/receive client q) first (sqs/delete client))
nil
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
"99"
----
```
Rather than trying to remember to do this, just use the
`deleting-consumer` "middleware" to produce a function that calls
the message-processing function you provide to it, and then
automatically deletes the processed message from the origining queue:
----
=> (doall (map
```clojure
#> (doall (map
(sqs/deleting-consumer client (comp println :body))
(sqs/receive client q :limit 10)))
0
@ -173,11 +210,11 @@ automatically deletes the processed message from the origining queue:
52
55
(nil nil nil nil nil nil nil nil nil nil)
=> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
#> (get (sqs/queue-attrs client q) "ApproximateNumberOfMessages")
"90"
----
```
=== Consuming queues as seqs
### Consuming queues as seqs
seqs being the _lingua franca_ of Clojure collections, it would be helpful if we
could treat an SQS queue as a seq of messages. While `receive` does return
@ -187,11 +224,11 @@ a seq of messages, each `receive` call is limited to receiving a maximum of
The solution to this is `polling-receive`, which returns a lazy seq that
reaches out to SQS as necessary:
----
=> (map (sqs/deleting-consumer client :body)
```clojure
#> (map (sqs/deleting-consumer client :body)
(sqs/polling-receive client q :limit 10))
("3" "5" "7" "8" ... "81" "90" "91")
----
```
`polling-receive` accepts all of the same optional kwargs as `receive` does,
but adds two more to control its usage of `receive`:
@ -210,21 +247,21 @@ terminate because none have been available for a while.
Here's an example where one thread sends a message once a second for a minute,
and another consumes those messages using a lazy seq provided by `polling-receive`:
----
=> (defn send-dummy-messages
```clojure
#> (defn send-dummy-messages
[client q count]
(future (doseq [n (range count)]
(Thread/sleep 100)
(sqs/send client q (str n)))))
#'cemerick.bandalore-test/send-dummy-messages
=> (defn consume-dummy-messages
#> (defn consume-dummy-messages
[client q]
(future (dorun (map (sqs/deleting-consumer client (comp println :body))
(sqs/polling-receive client q :max-wait Integer/MAX_VALUE :limit 10)))))
(sqs/polling-receive client q :max-wait Long/MAX_VALUE :limit 10)))))
#'cemerick.bandalore-test/consume-dummy-messages
=> (consume-dummy-messages client q) ;; start the consumer
#> (consume-dummy-messages client q) ;; start the consumer
#<core$future_call$reify__5500@a6f00bc: :pending>
=> (send-dummy-messages client q 1000) ;; start the sender
#> (send-dummy-messages client q 1000) ;; start the sender
#<core$future_call$reify__5500@18986032: :pending>
3
4
@ -235,20 +272,20 @@ and another consumes those messages using a lazy seq provided by `polling-receiv
5
7
...
----
```
You'd presumably want to set up some ways to control your consumer, but hopefully
you see that it would be trivial to parallelize the processing function being
wrapped by `deleting-consumer` using `pmap`, distribute processing among agents
if that's more appropriate, etc.
== Building Bandalore
## Building Bandalore
Have maven. From the command line:
----
$ mvn clean install
----
```
$ mvn clean verify
```
*The tests are all live*, so:
@ -261,27 +298,27 @@ Since the tests are live, you either need to add your AWS credentials to your
`~/.m2/settings.xml` file as properties, or specify them on the command line
using `-D` switches:
----
```
$ mvn -Daws.id=XXXXXXX -Daws.secret-key=YYYYYYY clean install
----
```
Or, you can skip the tests entirely:
----
```
$ mvn -Dmaven.test.skip=true clean install
----
```
In any case, you'll find a built `.jar` file in the `target` directory, and in
its designated spot in `~/.m2/repository` (assuming you ran `install` rather than
e.g. `package`).
== Need Help?
## Need Help?
Ping `cemerick` on freenode irc or twitter if you have questions
or would like to contribute patches.
== License
## License
Copyright © 2011 Chas Emerick
Copyright © 2011-2013 Chas Emerick and contributors.
Licensed under the EPL. (See the file epl-v10.html.)

34
pom.xml
View file

@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.cemerick</groupId>
<artifactId>bandalore</artifactId>
<version>0.0.1</version>
<version>0.0.7-SNAPSHOT</version>
<name>bandalore</name>
<description>A Clojure library for Amazon's Simple Queue Service (SQS).</description>
<url>http://github.com/cemerick/bandalore</url>
@ -29,20 +29,14 @@
</scm>
<properties>
<clojure.version>1.2.0</clojure.version>
<clojure.version>1.4.0</clojure.version>
</properties>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure-contrib</artifactId>
<version>1.2.0</version>
<scope>test</scope>
<version>1.8.0</version>
</dependency>
</dependencies>
@ -63,6 +57,28 @@
<warnOnReflection>true</warnOnReflection>
</configuration>
</plugin>
<plugin>
<artifactId>maven-invoker-plugin</artifactId>
<version>1.5</version>
<configuration>
<projectsDirectory>src/integration</projectsDirectory>
<cloneProjectsTo>${project.build.directory}/integration</cloneProjectsTo>
<pomIncludes>
<pomInclude>*/pom.xml</pomInclude>
</pomIncludes>
<streamLogs>true</streamLogs>
<goals><goal>clojure:test</goal></goals>
</configuration>
<executions>
<execution>
<id>integration-test</id>
<goals>
<goal>install</goal>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View file

@ -0,0 +1,44 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>clojure.tools</groupId>
<version>0.0.0</version>
<artifactId>test-clojure-1.2.0</artifactId>
<name>(Clojure 1.2.0 tests)</name>
<dependencies>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>@project.groupId@</groupId>
<artifactId>@project.artifactId@</artifactId>
<version>@project.version@</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.theoryinpractise</groupId>
<artifactId>clojure-maven-plugin</artifactId>
<version>1.3.8</version>
<configuration>
<baseTestSourceDirectory>@basedir@/src/test/clojure</baseTestSourceDirectory>
<clojureOptions>-Daws.id=@aws.id@ -Daws.secret-key=@aws.secret-key@</clojureOptions>
</configuration>
<executions>
<execution>
<id>test-clojure</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,44 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>clojure.tools</groupId>
<version>0.0.0</version>
<artifactId>test-clojure-1.3.0</artifactId>
<name>(Clojure 1.3.0 tests)</name>
<dependencies>
<dependency>
<groupId>org.clojure</groupId>
<artifactId>clojure</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>@project.groupId@</groupId>
<artifactId>@project.artifactId@</artifactId>
<version>@project.version@</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.theoryinpractise</groupId>
<artifactId>clojure-maven-plugin</artifactId>
<version>1.3.8</version>
<configuration>
<baseTestSourceDirectory>@basedir@/src/test/clojure</baseTestSourceDirectory>
<clojureOptions>-Daws.id=@aws.id@ -Daws.secret-key=@aws.secret-key@</clojureOptions>
</configuration>
<executions>
<execution>
<id>test-clojure</id>
<phase>test</phase>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -19,22 +19,28 @@
(defn create-client
"Creates a synchronous AmazonSQSClient using the provided account id, secret key,
and optional com.amazonaws.ClientConfiguration."
([]
(create-client (com.amazonaws.ClientConfiguration.)))
([client-config]
(AmazonSQSClient.
(.withUserAgent client-config "Bandalore - SQS for Clojure")))
([id secret-key]
(create-client id secret-key (com.amazonaws.ClientConfiguration.)))
([id secret-key client-config]
(AmazonSQSClient. (com.amazonaws.auth.BasicAWSCredentials. id secret-key)
(.withUserAgent client-config "Bandalore - SQS for Clojure"))))
(def ^{:private true} visibility-warned? (atom false))
(defn create-queue
"Creates a queue with the given name, returning the corresponding URL string.
Returns successfully if the queue already exists.
Specify an optional :visibility keyword arg to set the new queue's default
visibility timeout in seconds."
[^AmazonSQSClient client queue-name & {:keys [visibility]}]
(->> (if visibility
(CreateQueueRequest. queue-name visibility)
(CreateQueueRequest. queue-name))
Returns successfully if the queue already exists."
[^AmazonSQSClient client queue-name & {:as options}]
(when (and (:visibility options) (not @visibility-warned?))
(println "[WARNING] :visibility option to cemerick.bandalore/create-queue no longer supported;")
(println "[WARNING] See https://github.com/cemerick/bandalore/issues/3")
(reset! visibility-warned? true))
(->> (CreateQueueRequest. queue-name)
(.createQueue client)
.getQueueUrl))
@ -44,9 +50,10 @@
(.deleteQueue client (DeleteQueueRequest. queue-url)))
(defn list-queues
"Returns a seq of all queues' URL strings."
[^AmazonSQSClient client]
(->> (ListQueuesRequest.)
"Returns a seq of all queues' URL strings. Takes an optional string prefix
argument to only list queues with names that start with the prefix."
[^AmazonSQSClient client & {:keys [prefix]}]
(->> (ListQueuesRequest. prefix)
(.listQueues client)
.getQueueUrls
seq))
@ -99,6 +106,11 @@
Defaults to the empty set (i.e. no attributes will be included in
received messages).
See the SQS documentation for all support message attributes.
:wait-time-seconds - enables long poll support. time is in seconds, bewteen
0 (default - no long polling) and 20.
Allows Amazon SQS service to wait until a message is available
in the queue before sending a response.
See the SQS documentation at (http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html)
Returns a seq of maps with these slots:
@ -109,13 +121,17 @@
:receipt-handle - the ID used to delete the message from the queue after
it has been fully processed.
:source-queue - the URL of the queue from which the message was received"
[^AmazonSQSClient client queue-url & {:keys [limit visibility ^java.util.Collection attributes]
[^AmazonSQSClient client queue-url & {:keys [limit
visibility
wait-time-seconds
^java.util.Collection attributes]
:or {limit 1
attributes #{}}}]
(let [req (-> (ReceiveMessageRequest. queue-url)
(.withMaxNumberOfMessages (-> limit (min 10) (max 1)))
(.withMaxNumberOfMessages (-> limit (min 10) (max 1) int Integer/valueOf))
(.withAttributeNames attributes))
req (if visibility (.withVisibilityTimeout req visibility) req)]
req (if wait-time-seconds (.withWaitTimeSeconds req (Integer/valueOf (int wait-time-seconds))) req)
req (if visibility (.withVisibilityTimeout req (Integer/valueOf (int visibility))) req)]
(->> (.receiveMessage client req)
.getMessages
(map (partial message-map queue-url)))))
@ -199,4 +215,4 @@
;; void removePermission(RemovePermissionRequest removePermissionRequest)
; The RemovePermission action revokes any permissions in the queue policy that matches the specified Label parameter.
;; void addPermission(AddPermissionRequest addPermissionRequest)
;;The AddPermission action adds a permission to a queue for a specific principal.
;;The AddPermission action adds a permission to a queue for a specific principal.

View file

@ -1,13 +1,8 @@
(ns cemerick.bandalore-test
(:use cemerick.bandalore
clojure.test
clojure.contrib.core)
clojure.test)
(:refer-clojure :exclude (send)))
#_(do
(System/setProperty "aws.id" "")
(System/setProperty "aws.secret-key" ""))
; kill the verbose aws logging
(.setLevel (java.util.logging.Logger/getLogger "com.amazonaws")
java.util.logging.Level/WARNING)
@ -18,7 +13,7 @@
(assert (and id secret-key))
(create-client id secret-key)))
(def *test-queue-url* nil)
(def ^{:dynamic true} *test-queue-url* nil)
(defn- uuid
[]
@ -56,30 +51,34 @@
`(deftest ~name
(println '~name) ; lots of sleeping in these tests, give some indication of life
(binding [*test-queue-url* (create-queue client (test-queue-name))]
(try
(is *test-queue-url*)
~@body
(finally
(delete-queue client *test-queue-url*))))))
(is *test-queue-url*)
~@body)))
(defsqstest test-list-queues
(let [msg (uuid)]
; sending a msg seems to "force" the queue's existence in listings
(send client *test-queue-url* msg)
(wait-for-condition #((set (list-queues client)) *test-queue-url*)
"Created queue not visible in result of list-queues")))
"Created queue not visible in result of list-queues")
(wait-for-condition #((set (list-queues client :prefix test-queue-name-prefix)) *test-queue-url*)
"Created queue not visible in result of list-queues with prefix")))
(defsqstest test-queue-attrs
(let [{:strs [VisibilityTimeout MaximumMessageSize] :as base-attrs} (queue-attrs client *test-queue-url*)
expected {"VisibilityTimeout" "117" "MaximumMessageSize" "1535"}]
(is (and VisibilityTimeout MaximumMessageSize))
(let [{:strs [MaximumMessageSize] :as base-attrs} (queue-attrs client *test-queue-url*)
expected {"MaximumMessageSize" "1535"}]
(is MaximumMessageSize)
(queue-attrs client *test-queue-url* expected)
(wait-for-condition #(= expected
(->> (queue-attrs client *test-queue-url*)
(filter (comp (set (keys expected)) first))
(into {})))
(wait-for-condition #(= expected (select-keys (queue-attrs client *test-queue-url*) (keys expected)))
"Queue attribute test failed after waiting for test condition")))
(defsqstest receive-delete
(let [msg (uuid)]
(send client *test-queue-url* msg)
(let [[{:keys [body] :as rmsg}] (receive client *test-queue-url*)]
(is (= msg body))
(delete client *test-queue-url* rmsg)
(is (empty? (receive client *test-queue-url*))))))
(defn- wait-for-full-queue
[q min-cnt queue-name]
(wait-for-condition #(-> (queue-attrs client q)
@ -122,3 +121,11 @@
(let [v (-> (receive client *test-queue-url* :visibility 5) first :body read-string)]
(is (some #(= v (-> % :body read-string)) (polling-receive client *test-queue-url* :max-wait 10000)))))
(defsqstest test-receive-long-polling
(let [q *test-queue-url*
no-poll (future (receive client q))
long-poll (future (receive client q :wait-time-seconds 20))]
(Thread/sleep 10000)
(send client q "1")
(is (== 0 (count @no-poll)) "Should not return messages")
(is (== 1 (count @long-poll)) "Should return 1 message")))