通信

Http

requestsopen in new window 是 Python 的Http 请求库,AirScript中内置了这个库

# 导包
import requrest

模版


import requests

# 请求模版
r = requests.get("http://www.baidu.com") 
# 打印状态Code
print(r.status_code) 
# 打印header 中的 content-type
print(r.headers['content-type']) 
#打印字符编码
print(r.encoding) 
#打印返回文本内容
print(r.text) 

# 如果服务器返回json,可使用以下语句进行json转换
# r.json() 

get 案例

import requests
# 访问天气预报接口
r = requests.get("http://www.weather.com.cn/data/sk/101010100.html")
# 进行转码
r.encoding = r.apparent_encoding
# 打印获取的所有文本信息
print(r.text)
# 转换为json对象
obj =  r.json()

print(obj['weatherinfo']['city']) # 获取当前城市名称

post 案例

  • Form 形式发送
import requests
# 使用post请求获取 ip所属信息
r = requests.post("https://api.oioweb.cn/api/ip/ipaddress",data ={"ip":"114.114.114.114"})

# 打印一下返回的文本
print(r.text)

# 看文本格式,服务器返回的是json数据,转换为json对象,方便拿属性
obj = r.json()

#获取 ip所在地址
print(obj["result"]["disp"])

  • Json 形式发送
import requests
import json
# 访问地址
url = 'http://httpbin.org/post'

# json 格式化
s = json.dumps({'key1': 'value1', 'key2': 'value2'})

# 发送 post请求
r = requests.post(url, data=s)

# 打印文本
print (r.text)

下载

  • 小文件下载
# 导入http模块
import requests
# 导入环境R,方便获取文件路径
from airscript.system import R

# 指定下载文件的地址
url = 'https://www.baidu.com/img/flexible/logo/pc/result@2.png' # 目标下载链接

# 通过get获取数据
r = requests.get(url)

# 保存文件至sd卡下的1.png
with open (R.sd("/1.png"), 'wb') as f:
    f.write(r.content)
  • 大文件下载
# 导入http库
import requests
# 导入环境R,方便得到存储文件地址
from airscript.system import R
# 下载文件地址
url = 'https://www.baidu.com/img/flexible/logo/pc/result@2.png' # 目标下载链接

# get获取数据,传入stream 分批流形式获取数据
r = requests.get(url, stream=True)

# 打开sd卡文件,准备写入数据
f = open(R.sd("/2.png"), "wb")

# 循环批次写入数据
for chunk in r.iter_content(chunk_size=512):
  if chunk:
    f.write(chunk)

#关闭文件
f.close()

WebSocket

长连接通信通道

websocket-client

AirScript 内置了 websocket-client Python包

我们可以使用该方法快速连接某一WebSocketServer

下面是使用案例:


from websocket import WebSocketApp

def on_message(self, message):
    print("####### on_message #######")
    print("message:%s" % message)

def on_error(self, error):
    print("####### on_error #######")
    print("error:%s" % error)

def on_close(self):
    print("####### on_close #######")

def on_open(self):
    print("####### on_open #######")


url = "ws://192.168.31.108:10102/log/"

ws = WebSocketApp(url,
                    on_open=on_open,
                    on_message=on_message,
                    on_error=on_error,
                    on_close=on_close)
ws.run_forever()

更新日期:
Contributors: 自在