多语言客户端实现
以下是各种编程语言的 WebSocket 客户端实现示例,用于连接和调用 iClick API。
提示
所有示例都实现了相同的功能:
- WebSocket 连接和自动重连
- 请求/响应匹配(基于
evtid) - 超时处理
- 二进制数据支持
- 事件推送处理
注意
以下代码示例可能未经充分验证,仅供参考学习。在生产环境使用前,请务必进行充分的测试和验证。
javascript
const wserverPort = 23188
const wsevents = {}
let wsclient = null
let reconnectTimer = null
export function reconnect() {
if( reconnectTimer || wsclient?.readyState === WebSocket.CONNECTING ){
return
}
reconnectTimer = setTimeout(() => {
console.log('API服务重连中...')
connectServer().finally(() => reconnectTimer = null)
}, 1000 * 3)
}
export function connectServer() {
return new Promise((_resolve) => {
if( wsclient ){
return
}
wsclient = new WebSocket(`ws://127.0.0.1:${wserverPort}`)
wsclient.binaryType = 'arraybuffer'
wsclient.onopen = () => {
console.log('API服务连接成功')
_resolve()
}
wsclient.onmessage = (_message) => {
let _payload = _message.data,
_bindata = null
if( typeof _payload === 'string' ){
_payload = JSON.parse(_payload)
}else{
// 部分接口返回的是二进制数据,需要解析 [metaDataLength][metaData][binary]
const _decoder = new TextDecoder('utf-8')
const _metaLength = _decoder.decode(_payload.slice(0, 6))
_bindata = _payload.slice(6 + parseInt(_metaLength))
_payload = JSON.parse(_decoder.decode(_payload.slice(6, 6 + parseInt(_metaLength))))
_payload.data = _bindata
_bindata = null
}
const _eventId = _payload.evtid
const _event = wsevents[ _eventId ]
if( _event ){
if( _payload.type === 'error' ){
_event.reject(new Error(_payload.error))
}else{
_event.resolve( _payload.data )
}
delete wsevents[ _eventId ]
}else if( _payload.event ){
//你可以根据不同语言派发事件
}
}
wsclient.onclose = (code) => {
wsclient = null
console.log('API服务连接关闭:', code)
reconnect()
}
wsclient.onerror = (err) => {
console.error('API服务连接错误:', err)
}
})
}
export function apiInvoke(type, _params = {}, _timeout = 18) {
return new Promise((resolve, reject) => {
if( !wsclient || wsclient.readyState !== WebSocket.OPEN ){
reject( new Error('API服务连接未就绪') )
}else{
const _eventId = Math.random().toString(36).substring(2, 12)
const _payload = {
..._params,
type,
evtid: _eventId,
timeout: _timeout
}
wsevents[_payload.evtid] = { resolve, reject }
wsclient.send(JSON.stringify(_payload))
if( _timeout > 0 ){
setTimeout(() => {
reject(new Error(`API服务: ${_payload.evtid} Invoke Timeout !`))
delete wsevents[_payload.evtid]
}, _timeout * 1000)
}
}
})
}
// 使用示例
await connectServer()
const result = await apiInvoke('click', { x: 100, y: 200, deviceId: 'XXXXXXXXXX' })javascript
const WebSocket = require('ws')
const wserverPort = 23188
const wsevents = {}
let wsclient = null
let reconnectTimer = null
function reconnect() {
if( reconnectTimer || (wsclient && wsclient.readyState === WebSocket.CONNECTING) ){
return
}
reconnectTimer = setTimeout(() => {
console.log('API服务重连中...')
connectServer().finally(() => reconnectTimer = null)
}, 1000 * 3)
}
function connectServer() {
return new Promise((_resolve, _reject) => {
if( wsclient ){
return _resolve()
}
wsclient = new WebSocket(`ws://127.0.0.1:${wserverPort}`)
wsclient.on('open', () => {
console.log('API服务连接成功')
_resolve()
})
wsclient.on('message', (_data) => {
let _payload = null,
_bindata = null
try {
// Node.js ws 库接收到的消息总是 Buffer 类型
// 检查第一个字节是否为 { 或 [ (JSON 格式)
const _firstByte = _data[0]
const _isJson = _firstByte === 0x7B || _firstByte === 0x5B // { 或 [
if (_isJson) {
// 直接是 JSON 字符串
_payload = JSON.parse(_data.toString('utf-8'))
} else {
// 二进制格式: [metaDataLength][metaData][binary]
const _metaLength = _data.slice(0, 6).toString('utf-8').trim()
const _metaLengthInt = parseInt(_metaLength, 10)
_bindata = _data.slice(6 + _metaLengthInt)
const _metaJson = _data.slice(6, 6 + _metaLengthInt).toString('utf-8')
_payload = JSON.parse(_metaJson)
_payload.data = _bindata
_bindata = null
}
} catch (error) {
console.error('解析消息失败:', error.message)
return
}
const _eventId = _payload.evtid
const _event = wsevents[ _eventId ]
if( _event ){
if( _payload.type === 'error' ){
_event.reject(new Error(_payload.error))
}else{
_event.resolve( _payload.data )
}
delete wsevents[ _eventId ]
}else if( _payload.event ){
// 你可以根据不同语言派发事件
console.log('收到事件推送:', _payload.event, _payload.data)
}
})
wsclient.on('close', (code) => {
wsclient = null
console.log('API服务连接关闭:', code)
reconnect()
})
wsclient.on('error', (err) => {
console.error('API服务连接错误:', err.message)
_reject(err)
})
})
}
function apiInvoke(type, _params = {}, _timeout = 18) {
return new Promise((resolve, reject) => {
if( !wsclient || wsclient.readyState !== WebSocket.OPEN ){
reject( new Error('API服务连接未就绪') )
}else{
const _eventId = Math.random().toString(36).substring(2, 12)
const _payload = {
..._params,
type,
evtid: _eventId,
timeout: _timeout
}
wsevents[_payload.evtid] = { resolve, reject }
wsclient.send(JSON.stringify(_payload))
if( _timeout > 0 ){
setTimeout(() => {
if( wsevents[_payload.evtid] ){
reject(new Error(`API服务: ${_payload.evtid} Invoke Timeout !`))
delete wsevents[_payload.evtid]
}
}, _timeout * 1000)
}
}
})
}
// 使用示例
async function main() {
await connectServer()
try {
// 获取设备列表
const devices = await apiInvoke('getDevices')
console.log('设备列表:', devices)
// 点击操作
if(devices && devices.length > 0) {
const result = await apiInvoke('click', {
x: 100,
y: 200,
deviceId: devices[0].id
})
console.log('点击结果:', result)
}
} catch(error) {
console.error('调用失败:', error.message)
}
}
main()python
import asyncio
import websockets
import json
import time
from typing import Dict, Any, Optional
class IClickClient:
def __init__(self, port: int = 23188):
self.port = port
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.events: Dict[str, asyncio.Future] = {}
self.is_reconnecting = False
async def connect(self):
"""连接到 WebSocket 服务器"""
try:
self.ws = await websockets.connect(f'ws://127.0.0.1:{self.port}')
print('API服务连接成功')
asyncio.create_task(self._message_handler())
except Exception as e:
print(f'API服务连接错误: {e}')
await self.reconnect()
async def reconnect(self):
"""重新连接"""
if self.is_reconnecting:
return
self.is_reconnecting = True
print('API服务重连中...')
await asyncio.sleep(3)
await self.connect()
self.is_reconnecting = False
async def _message_handler(self):
"""处理接收到的消息"""
try:
async for message in self.ws:
payload = self._parse_message(message)
self._handle_response(payload)
except websockets.exceptions.ConnectionClosed:
print('API服务连接关闭')
await self.reconnect()
except Exception as e:
print(f'消息处理错误: {e}')
def _parse_message(self, message) -> dict:
"""解析消息"""
if isinstance(message, str):
return json.loads(message)
# 二进制格式响应
meta_length = int(message[:6].decode('utf-8'))
meta_data = json.loads(message[6:6+meta_length].decode('utf-8'))
meta_data['data'] = message[6+meta_length:]
return meta_data
def _handle_response(self, payload: dict):
"""处理响应"""
evtid = payload.get('evtid')
if evtid and evtid in self.events:
future = self.events.pop(evtid)
if not future.done():
if payload.get('type') == 'error':
future.set_exception(Exception(payload.get('error', '未知错误')))
else:
future.set_result(payload.get('data'))
def _generate_event_id(self) -> str:
"""生成随机事件 ID"""
return f"{int(time.time() * 1000000) % 1000000000:09d}{id(self) % 1000:03d}"
async def invoke(self, api_type: str, params: Dict[str, Any] = None, timeout: int = 18) -> Any:
"""调用 API"""
if not self.ws or self.ws.close_code is not None:
raise Exception('API服务连接未就绪')
evtid = self._generate_event_id()
payload = {
'type': api_type,
'evtid': evtid,
'timeout': timeout,
**(params or {})
}
future = asyncio.Future()
self.events[evtid] = future
await self.ws.send(json.dumps(payload))
try:
return await asyncio.wait_for(future, timeout=timeout + 1)
except asyncio.TimeoutError:
self.events.pop(evtid, None)
raise Exception(f'API调用超时: {api_type}')
async def disconnect(self):
"""断开连接"""
if self.ws:
await self.ws.close()
self.ws = None
# 使用示例
async def main():
client = IClickClient()
await client.connect()
try:
# 获取设备列表
devices = await client.invoke('getDevices')
print(f'设备列表: {devices}')
else:
print('没有可用的设备')
finally:
await client.disconnect()
if __name__ == '__main__':
asyncio.run(main())java
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.security.SecureRandom;
public class IClickClient {
private static final int PORT = 23188;
private static final SecureRandom RANDOM = new SecureRandom();
private static final String CHARS = "abcdefghijklmnopqrstuvwxyz0123456789";
private WebSocketClient wsClient;
private final ConcurrentHashMap<String, CompletableFuture<Object>> events = new ConcurrentHashMap<>();
private final Gson gson = new Gson();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicBoolean isReconnecting = new AtomicBoolean(false);
public synchronized void connect() throws Exception {
// 关闭旧连接
if (wsClient != null) {
wsClient.close();
}
URI uri = new URI("ws://127.0.0.1:" + PORT);
wsClient = new WebSocketClient(uri) {
@Override
public void onOpen(ServerHandshake handshake) {
System.out.println("API服务连接成功");
isReconnecting.set(false);
}
@Override
public void onMessage(String message) {
handleMessage(gson.fromJson(message, JsonObject.class));
}
@Override
public void onMessage(ByteBuffer bytes) {
byte[] data = bytes.array();
int metaLength = Integer.parseInt(new String(data, 0, 6, StandardCharsets.UTF_8).trim());
String metaJson = new String(data, 6, metaLength, StandardCharsets.UTF_8);
handleMessage(gson.fromJson(metaJson, JsonObject.class));
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("API服务连接关闭: " + code);
reconnect();
}
@Override
public void onError(Exception ex) {
System.err.println("API服务连接错误: " + ex.getMessage());
}
};
wsClient.connect();
}
private void handleMessage(JsonObject payload) {
if (!payload.has("evtid")) return;
String evtid = payload.get("evtid").getAsString();
CompletableFuture<Object> future = events.remove(evtid);
if (future != null) {
if (payload.has("type") && "error".equals(payload.get("type").getAsString())) {
future.completeExceptionally(new Exception(payload.get("error").getAsString()));
} else {
future.complete(payload.get("data"));
}
}
}
private void reconnect() {
if (!isReconnecting.compareAndSet(false, true)) return;
scheduler.schedule(() -> {
try {
System.out.println("API服务重连中...");
connect();
} catch (Exception e) {
System.err.println("重连失败: " + e.getMessage());
isReconnecting.set(false);
}
}, 3, TimeUnit.SECONDS);
}
private static String generateEventId() {
StringBuilder sb = new StringBuilder(10);
for (int i = 0; i < 10; i++) {
sb.append(CHARS.charAt(RANDOM.nextInt(CHARS.length())));
}
return sb.toString();
}
public CompletableFuture<Object> invoke(String type, Map<String, Object> params, int timeout) {
CompletableFuture<Object> future = new CompletableFuture<>();
if (wsClient == null || !wsClient.isOpen()) {
future.completeExceptionally(new Exception("API服务连接未就绪"));
return future;
}
String evtid = generateEventId();
Map<String, Object> payload = new HashMap<>(params != null ? params : new HashMap<>());
payload.put("type", type);
payload.put("evtid", evtid);
payload.put("timeout", timeout);
events.put(evtid, future);
wsClient.send(gson.toJson(payload));
// 设置超时
scheduler.schedule(() -> {
CompletableFuture<Object> f = events.remove(evtid);
if (f != null) {
f.completeExceptionally(new TimeoutException("API服务超时"));
}
}, timeout, TimeUnit.SECONDS);
return future;
}
public void close() {
scheduler.shutdown();
if (wsClient != null) {
wsClient.close();
}
}
// 使用示例
public static void main(String[] args) throws Exception {
IClickClient client = new IClickClient();
client.connect();
Thread.sleep(1000);
Map<String, Object> params = new HashMap<>();
params.put("x", 100);
params.put("y", 200);
params.put("deviceId", "XXXXXXXXXX");
client.invoke("click", params, 18)
.thenAccept(result -> System.out.println("点击结果: " + result))
.exceptionally(ex -> {
System.err.println("调用失败: " + ex.getMessage());
return null;
})
.thenRun(client::close);
}
}csharp
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
public class IClickClient
{
private const int Port = 23188;
private ClientWebSocket wsClient;
private ConcurrentDictionary<string, TaskCompletionSource<object>> events = new();
private CancellationTokenSource cts;
private readonly SemaphoreSlim reconnectLock = new(1, 1);
private bool isReconnecting = false;
public async Task ConnectAsync()
{
// 清理旧连接
if (wsClient != null)
{
try { await wsClient.CloseAsync(WebSocketCloseStatus.NormalClosure, "重连", CancellationToken.None); }
catch { }
wsClient?.Dispose();
}
cts?.Cancel();
cts?.Dispose();
cts = new CancellationTokenSource();
wsClient = new ClientWebSocket();
await wsClient.ConnectAsync(new Uri($"ws://127.0.0.1:{Port}"), cts.Token);
Console.WriteLine("API服务连接成功");
_ = Task.Run(MessageHandler);
}
private async Task MessageHandler()
{
var messageBuffer = new List<byte>();
var buffer = new byte[8192]; // 增大缓冲区
try
{
while (wsClient.State == WebSocketState.Open && !cts.Token.IsCancellationRequested)
{
var result = await wsClient.ReceiveAsync(new ArraySegment<byte>(buffer), cts.Token);
// 处理分片消息
messageBuffer.AddRange(new ArraySegment<byte>(buffer, 0, result.Count));
if (result.EndOfMessage)
{
if (result.MessageType == WebSocketMessageType.Text)
{
string message = Encoding.UTF8.GetString(messageBuffer.ToArray());
HandleJsonMessage(message);
}
else if (result.MessageType == WebSocketMessageType.Binary)
{
HandleBinaryMessage(messageBuffer.ToArray());
}
else if (result.MessageType == WebSocketMessageType.Close)
{
Console.WriteLine("API服务连接关闭");
await ReconnectAsync();
break;
}
messageBuffer.Clear();
}
}
}
catch (Exception ex)
{
Console.WriteLine($"API服务连接错误: {ex.Message}");
await ReconnectAsync();
}
}
private void HandleJsonMessage(string message)
{
var payload = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(message);
ProcessPayload(payload);
}
private void HandleBinaryMessage(byte[] data)
{
string metaLengthStr = Encoding.UTF8.GetString(data, 0, 6);
int metaLength = int.Parse(metaLengthStr.Trim());
string metaJson = Encoding.UTF8.GetString(data, 6, metaLength);
var payload = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(metaJson);
byte[] binaryData = new byte[data.Length - 6 - metaLength];
Array.Copy(data, 6 + metaLength, binaryData, 0, binaryData.Length);
ProcessPayload(payload, binaryData);
}
private void ProcessPayload(Dictionary<string, JsonElement> payload, byte[] binaryData = null)
{
if (payload.ContainsKey("evtid"))
{
string evtid = payload["evtid"].GetString();
if (events.TryRemove(evtid, out var tcs))
{
if (payload.ContainsKey("type") && payload["type"].GetString() == "error")
{
tcs.TrySetException(new Exception(payload["error"].GetString()));
}
else
{
object data = binaryData ?? payload.GetValueOrDefault("data");
tcs.TrySetResult(data);
}
}
}
}
private async Task ReconnectAsync()
{
if (isReconnecting) return;
await reconnectLock.WaitAsync();
try
{
if (isReconnecting) return;
isReconnecting = true;
await Task.Delay(3000);
Console.WriteLine("API服务重连中...");
await ConnectAsync();
}
finally
{
isReconnecting = false;
reconnectLock.Release();
}
}
private static string GenerateEventId()
{
return Guid.NewGuid().ToString("N")[..10]; // 使用GUID前10位
}
public async Task<object> InvokeAsync(string type, Dictionary<string, object> parameters = null, int timeout = 18)
{
if (wsClient == null || wsClient.State != WebSocketState.Open)
{
throw new Exception("API服务连接未就绪");
}
string evtid = GenerateEventId();
var payload = new Dictionary<string, object>(parameters ?? new())
{
["type"] = type,
["evtid"] = evtid,
["timeout"] = timeout
};
var tcs = new TaskCompletionSource<object>();
events[evtid] = tcs;
byte[] data = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(payload));
await wsClient.SendAsync(new ArraySegment<byte>(data), WebSocketMessageType.Text, true, cts.Token);
// 超时处理
using var timeoutCts = new CancellationTokenSource(timeout * 1000);
timeoutCts.Token.Register(() => {
if (events.TryRemove(evtid, out var t)) {
t.TrySetException(new TimeoutException("API服务超时"));
}
});
return await tcs.Task;
}
// 使用示例
public static async Task Main(string[] args)
{
var client = new IClickClient();
await client.ConnectAsync();
var parameters = new Dictionary<string, object>
{
["x"] = 100,
["y"] = 200,
["deviceId"] = "XXXXXXXXXX"
};
try
{
var result = await client.InvokeAsync("click", parameters);
Console.WriteLine($"点击结果: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"调用失败: {ex.Message}");
}
}
}go
package main
import (
"encoding/json"
"fmt"
"math/rand"
"sync"
"time"
"github.com/gorilla/websocket"
)
type IClickClient struct {
port int
conn *websocket.Conn
events map[string]chan Response
eventsMutex sync.RWMutex
reconnectMutex sync.Mutex
isReconnecting bool
}
type Response struct {
Data interface{}
Error error
}
type Payload struct {
Type string `json:"type,omitempty"`
EvtID string `json:"evtid,omitempty"`
Timeout int `json:"timeout,omitempty"`
Event string `json:"event,omitempty"`
Data interface{} `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
func NewIClickClient() *IClickClient {
return &IClickClient{
port: 23188,
events: make(map[string]chan Response),
}
}
func (c *IClickClient) Connect() error {
// 关闭旧连接
if c.conn != nil {
c.conn.Close()
}
url := fmt.Sprintf("ws://127.0.0.1:%d", c.port)
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return fmt.Errorf("连接失败: %w", err)
}
c.conn = conn
fmt.Println("API服务连接成功")
go c.messageHandler()
return nil
}
func (c *IClickClient) messageHandler() {
defer func() {
fmt.Println("API服务连接关闭")
c.reconnect()
}()
for {
messageType, message, err := c.conn.ReadMessage()
if err != nil {
return
}
var payload Payload
switch messageType {
case websocket.TextMessage:
if err := json.Unmarshal(message, &payload); err != nil {
fmt.Printf("解析JSON错误: %v\n", err)
continue
}
case websocket.BinaryMessage:
// 解析二进制消息
var metaLength int
fmt.Sscanf(string(message[:6]), "%d", &metaLength)
if err := json.Unmarshal(message[6:6+metaLength], &payload); err != nil {
fmt.Printf("解析元数据错误: %v\n", err)
continue
}
payload.Data = message[6+metaLength:]
}
c.processPayload(payload)
}
}
func (c *IClickClient) processPayload(payload Payload) {
if payload.EvtID != "" {
c.eventsMutex.RLock()
ch, exists := c.events[payload.EvtID]
c.eventsMutex.RUnlock()
if exists {
response := Response{Data: payload.Data}
if payload.Type == "error" {
response.Error = fmt.Errorf("%s", payload.Error)
}
ch <- response
c.eventsMutex.Lock()
delete(c.events, payload.EvtID)
c.eventsMutex.Unlock()
}
}
}
func (c *IClickClient) reconnect() {
c.reconnectMutex.Lock()
if c.isReconnecting {
c.reconnectMutex.Unlock()
return
}
c.isReconnecting = true
c.reconnectMutex.Unlock()
time.Sleep(3 * time.Second)
fmt.Println("API服务重连中...")
if err := c.Connect(); err != nil {
fmt.Printf("重连失败: %v\n", err)
}
c.reconnectMutex.Lock()
c.isReconnecting = false
c.reconnectMutex.Unlock()
}
func (c *IClickClient) generateEventID() string {
return fmt.Sprintf("%d%03d", time.Now().UnixNano()%1000000000, rand.Intn(1000))
}
func (c *IClickClient) Invoke(apiType string, params map[string]interface{}, timeout int) (interface{}, error) {
if c.conn == nil {
return nil, fmt.Errorf("API服务连接未就绪")
}
evtID := c.generateEventID()
payload := map[string]interface{}{
"type": apiType,
"evtid": evtID,
"timeout": timeout,
}
for k, v := range params {
payload[k] = v
}
ch := make(chan Response, 1)
c.eventsMutex.Lock()
c.events[evtID] = ch
c.eventsMutex.Unlock()
data, _ := json.Marshal(payload)
if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil {
c.eventsMutex.Lock()
delete(c.events, evtID)
c.eventsMutex.Unlock()
return nil, fmt.Errorf("发送消息失败: %w", err)
}
select {
case response := <-ch:
return response.Data, response.Error
case <-time.After(time.Duration(timeout) * time.Second):
c.eventsMutex.Lock()
delete(c.events, evtID)
c.eventsMutex.Unlock()
return nil, fmt.Errorf("API服务超时")
}
}
func (c *IClickClient) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// 使用示例
func main() {
client := NewIClickClient()
if err := client.Connect(); err != nil {
fmt.Printf("连接失败: %v\n", err)
return
}
defer client.Close()
time.Sleep(time.Second)
params := map[string]interface{}{
"x": 100,
"y": 200,
"deviceId": "XXXXXXXXXX",
}
result, err := client.Invoke("click", params, 18)
if err != nil {
fmt.Printf("调用失败: %v\n", err)
return
}
fmt.Printf("点击结果: %v\n", result)
}下一步
- 阅读更具体的 API 使用示例