Skip to content

Commit 7a47f42

Browse files
author
royee
committed
feat: add httpsqs client
0 parents  commit 7a47f42

8 files changed

Lines changed: 509 additions & 0 deletions

File tree

.gitignore

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
### Go template
2+
# Binaries for programs and plugins
3+
*.exe
4+
*.exe~
5+
*.dll
6+
*.so
7+
*.dylib
8+
9+
# Test binary, built with `go test -c`
10+
*.test
11+
12+
# Output of the go coverage tool, specifically when used with LiteIDE
13+
*.out
14+
15+
# Dependency directories (remove the comment below to include it)
16+
# vendor/
17+
18+
### IDE
19+
.idea

LICENSE.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
MIT License
2+
3+
Copyright (c) 2025 gomooth,save95
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
6+
7+
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
8+
9+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

README.md

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
# httpsqs
2+
3+
HTTPSQS 队列客户端
4+
5+
[基于HTTP协议的轻量级开源简单队列服务:HTTPSQS@张宴](http://blog.zyan.cc/httpsqs/)
6+
7+
```golang
8+
9+
type IClient interface {
10+
// Put 入队列(将文本消息放入队列)
11+
// 返回:当前队列的读取位置点 pos,及可能存在的错误
12+
// pos = -1 则表示队列已满,同时也会返回 err
13+
Put(ctx context.Context, name, data string) (pos int64, err error)
14+
15+
// Get 出队列(从队列中取出文本消息)
16+
// 返回 文本消息 data,当前队列的读取位置点 pos,及可能存在的错误
17+
// pos = -1 则表示没有未取出的消息队列(HTTPSQS_GET_END)
18+
Get(ctx context.Context, name string) (data string, pos int64, err error)
19+
20+
// Status 查看队列状态
21+
Status(ctx context.Context, name string) (*Status, error)
22+
23+
// View 查看指定队列位置点的内容
24+
// 跟一般的队列系统不同的是,HTTPSQS 可以查看指定队列ID(队列点)的内容,包括未出、已出的队列内容。
25+
// 可以方便地观测进入队列的内容是否正确。另外,假设有一个发送手机短信的队列,由客户端守护进程从队列
26+
// 中取出信息,并调用“短信网关接口”发送短信。但是,如果某段时间“短信网关接口”有故障,而这段时间队列
27+
// 位置点300~900的信息已经出队列,但是发送短信失败,我们还可以在位置点300~900被覆盖前,查看到这些
28+
// 位置点的内容,作相应的处理。
29+
View(ctx context.Context, name string, pos int64) (string, error)
30+
31+
// Reset 重置指定队列
32+
Reset(ctx context.Context, name string) error
33+
34+
// SetMaxQueue 更改指定队列的最大队列数量。默认的最大队列长度(100万条)
35+
SetMaxQueue(ctx context.Context, name string, max int) error
36+
37+
// SetSyncTime 不停止服务的情况下,修改定时刷新内存缓冲区内容到磁盘的间隔时间
38+
// 从HTTPSQS 1.3版本开始支持此功能。
39+
// 默认间隔时间:5秒 或 httpsqs -s <second> 参数设置的值。
40+
SetSyncTime(ctx context.Context, name string, duration time.Duration) error
41+
}
42+
43+
```
44+
45+
## Usage
46+
47+
### Start using it
48+
Download and install it:
49+
50+
```shell
51+
go get github.com/gomooth/httpsqs
52+
53+
```
54+
55+
Import it in your code:
56+
```bash
57+
import "github.com/gomooth/httpsqs"
58+
59+
```
60+
61+
## Basic Examples
62+
63+
```golang
64+
package main
65+
66+
import (
67+
"context"
68+
"encoding/json"
69+
"fmt"
70+
71+
"github.com/gomooth/httpsqs"
72+
)
73+
74+
func main() {
75+
client := httpsqs.NewClient(&httpsqs.Config{
76+
Addr: "127.0.0.1:1218",
77+
Timeout: 0,
78+
Password: "",
79+
})
80+
name := "queue_test"
81+
ctx := context.Background()
82+
bs, _ := json.Marshal(struct {
83+
Name string
84+
Title string
85+
Age int
86+
Money int
87+
}{
88+
"张三", "无产阶级", 30, 10000,
89+
})
90+
pos, err := client.Put(ctx, name, string(bs))
91+
fmt.Println(pos, err)
92+
}
93+
94+
```

client.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package httpsqs
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
"net/http"
9+
"net/url"
10+
"strconv"
11+
"strings"
12+
"time"
13+
14+
"github.com/save95/xerror"
15+
)
16+
17+
type client struct {
18+
config *Config
19+
}
20+
21+
func NewClient(config *Config) IClient {
22+
return &client{config: config}
23+
}
24+
25+
func (c *client) Put(ctx context.Context, name, data string) (int64, error) {
26+
values := url.Values{}
27+
values.Set("name", name)
28+
values.Set("opt", "put")
29+
values.Set("data", data)
30+
31+
body, headers, err := c.httpGet(ctx, values, []string{"pos"})
32+
if err != nil {
33+
return 0, xerror.Wrap(err, "httpsqs put failed")
34+
}
35+
36+
switch body {
37+
case "HTTPSQS_PUT_OK":
38+
pos, _ := strconv.Atoi(headers["pos"])
39+
return int64(pos), nil
40+
case "HTTPSQS_PUT_ERROR":
41+
return 0, xerror.New("httpsqs put err")
42+
case "HTTPSQS_PUT_END":
43+
return -1, xerror.New("httpsqs is full")
44+
default:
45+
return 0, xerror.Errorf("httpsqs put failed: %s", body)
46+
}
47+
}
48+
49+
func (c *client) Get(ctx context.Context, name string) (string, int64, error) {
50+
values := url.Values{}
51+
values.Set("name", name)
52+
values.Set("opt", "get")
53+
54+
body, headers, err := c.httpGet(ctx, values, []string{"pos"})
55+
if err != nil {
56+
return "", 0, xerror.Wrap(err, "httpsqs get failed")
57+
}
58+
59+
switch body {
60+
case "HTTPSQS_GET_END": // 没有未取出的消息队列
61+
return "", -1, nil
62+
default:
63+
pos, _ := strconv.Atoi(headers["pos"])
64+
return body, int64(pos), nil
65+
}
66+
}
67+
68+
func (c *client) Status(ctx context.Context, name string) (*Status, error) {
69+
values := url.Values{}
70+
values.Set("name", name)
71+
values.Set("opt", "status_json")
72+
73+
body, _, err := c.httpGet(ctx, values, nil)
74+
if err != nil {
75+
return nil, xerror.Wrap(err, "httpsqs status failed")
76+
}
77+
78+
var res Status
79+
if err := json.Unmarshal([]byte(body), &res); nil != err {
80+
return nil, xerror.Wrapf(err, "httpsqs status failed: response not json: %s", body)
81+
}
82+
83+
return &res, nil
84+
}
85+
86+
func (c *client) View(ctx context.Context, name string, pos int64) (string, error) {
87+
if pos <= 0 || pos > 1000000000 {
88+
return "", xerror.New("input pos error")
89+
}
90+
91+
values := url.Values{}
92+
values.Set("name", name)
93+
values.Set("opt", "view")
94+
values.Set("pos", strconv.Itoa(int(pos)))
95+
96+
body, _, err := c.httpGet(ctx, values, nil)
97+
if err != nil {
98+
return "", xerror.Wrap(err, "httpsqs view failed")
99+
}
100+
101+
return body, nil
102+
}
103+
104+
func (c *client) Reset(ctx context.Context, name string) error {
105+
values := url.Values{}
106+
values.Set("name", name)
107+
values.Set("opt", "reset")
108+
109+
body, _, err := c.httpGet(ctx, values, nil)
110+
if err != nil {
111+
return xerror.Wrap(err, "httpsqs reset failed")
112+
}
113+
114+
switch body {
115+
case "HTTPSQS_RESET_OK":
116+
return nil
117+
default:
118+
return xerror.Errorf("httpsqs reset failed: %s", body)
119+
}
120+
}
121+
122+
func (c *client) SetMaxQueue(ctx context.Context, name string, max int) error {
123+
values := url.Values{}
124+
values.Set("name", name)
125+
values.Set("opt", "maxqueue")
126+
values.Set("num", strconv.Itoa(max))
127+
128+
body, _, err := c.httpGet(ctx, values, nil)
129+
if err != nil {
130+
return xerror.Wrap(err, "httpsqs set max_queue failed")
131+
}
132+
133+
switch body {
134+
case "HTTPSQS_MAXQUEUE_OK":
135+
return nil
136+
case "HTTPSQS_MAXQUEUE_CANCEL":
137+
return xerror.New("httpsqs option cancel")
138+
default:
139+
return xerror.Errorf("httpsqs set max_queue failed: %s", body)
140+
}
141+
}
142+
143+
func (c *client) SetSyncTime(ctx context.Context, name string, duration time.Duration) error {
144+
num := int(duration.Seconds())
145+
if num < 1 || num > 1000000000 {
146+
return xerror.New("input num error")
147+
}
148+
149+
values := url.Values{}
150+
values.Set("name", name)
151+
values.Set("opt", "synctime")
152+
values.Set("num", strconv.Itoa(num))
153+
154+
body, _, err := c.httpGet(ctx, values, nil)
155+
if err != nil {
156+
return xerror.Wrap(err, "httpsqs set sync_time failed")
157+
}
158+
159+
switch body {
160+
case "HTTPSQS_SYNCTIME_OK":
161+
return nil
162+
case "HTTPSQS_SYNCTIME_CANCEL":
163+
return xerror.New("httpsqs option cancel")
164+
default:
165+
return xerror.Errorf("httpsqs set synctime failed: %s", body)
166+
}
167+
}
168+
169+
func (c *client) httpGet(ctx context.Context, values url.Values, parseHeaders []string) (string, map[string]string, error) {
170+
furl := fmt.Sprintf("http://%s/", c.config.Addr)
171+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, furl, nil)
172+
if nil != err {
173+
return "", nil, err
174+
}
175+
176+
if len(c.config.Password) > 0 {
177+
values.Set("auth", c.config.Password)
178+
}
179+
req.URL.RawQuery = values.Encode()
180+
181+
httpClient := http.DefaultClient
182+
if c.config.Timeout > 0 {
183+
httpClient.Timeout = c.config.Timeout
184+
} else {
185+
httpClient.Timeout = 5 * time.Second
186+
}
187+
resp, err := httpClient.Do(req)
188+
if nil != err {
189+
return "", nil, err
190+
}
191+
192+
defer func() {
193+
_ = resp.Body.Close()
194+
}()
195+
196+
bodyBytes, err := io.ReadAll(resp.Body)
197+
if err != nil {
198+
return "", nil, err
199+
}
200+
201+
body := strings.TrimSpace(string(bodyBytes))
202+
switch body {
203+
case "HTTPSQS_ERROR": // 全局错误
204+
return "", nil, xerror.New("httpsqs global error")
205+
case "HTTPSQS_AUTH_FAILED": // 密码校验失败
206+
return "", nil, xerror.New("httpsqs auth error")
207+
default:
208+
headerResult := make(map[string]string)
209+
for _, header := range parseHeaders {
210+
headerResult[header] = strings.TrimSpace(resp.Header.Get(header))
211+
}
212+
return body, headerResult, nil
213+
}
214+
}

0 commit comments

Comments
 (0)