1
1.1 Cargo.toml
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
lazy_static = "1.4.0"
egg-mode = "0.16"
1.2 code
#[macro_use]
extern crate lazy_static;
use reqwest;
use reqwest::{header, Client, Result};
use serde::Deserialize;
use std::collections::HashSet;
use tokio;
use tokio::sync::watch;
use tokio::time::{self, sleep, Duration};
const tweet_url: &'static str = "https://api.twitter.com/2/users/:id/tweets?max_results=1";
const huobi_url: &'static str = "https://api.huobi.pro/market/detail/merged?"; // 火币api
lazy_static! {
static ref bear_token: String = std::env::var_os("BearToken").unwrap().into_string();
static ref client: Client = Client::new();
// client for build request
static ref tweet_client: Client = {
let mut h = header::HeaderMap::new();
h.insert("Authorization",format!("Bearer {}", bear_token).parse().unwrap());
h.insert("Content-Type", "application/json".parse().unwrap());
Client::builder()
.default_headers(h)
.build()
.unwrap()
};
static ref url : String = format!("{}?usernames=elonmusk", tweet_url);
}
// see https://huobiapi.github.io/docs/spot/v1/cn/#ticker for more info
// reponse struct of https://api.huobi.pro/market/detail/merged
#[derive(Deserialize, Debug)]
struct PriceResponse {
ch: String,
status: String,
ts: u64,
tick: Tick,
}
impl PriceResponse {
fn get_tick_close(&self) -> f32 {
self.tick.close
}
}
#[derive(Deserialize, Debug)]
struct Tick {
id: u64,
amount: f32,
count: u32,
open: f32,
close: f32,
low: f32,
high: f32,
vol: f32,
bid: [f32; 2],
ask: [f32; 2],
}
// get price
async fn __get_price(kind: &str) -> Result<f32> {
let btc_price_query = format!("{}symbol={}usdt", huobi_url, kind);
let current_btc_price = (*client)
.get(btc_price_query)
.send()
.await?
.json::<PriceResponse>()
.await?;
let cur_btc = current_btc_price.get_tick_close();
Ok(cur_btc)
}
async fn get_price_btc() -> Result<f32> {
__get_price("btc").await
}
// get musk's tweet
async fn get_tweet() -> Result<String> {
(*tweet_client).get(&*tweet_url).send().await?.text().await
}
#[tokio::main]
async fn main() -> Result<()> {
let (tx, mut rx) = watch::channel(0.0f32);
let _send_btc_price = tokio::spawn(async move {
// 1. 先初始化定时器, 休眠30s=1000ms*30
let sleep = sleep(Duration::from_millis(10));
tokio::pin!(sleep); // pin住
// 2. 得到最近一次的btc价格, 作为初始化
let mut old_btc_price = get_price_btc().await.unwrap();
loop {
// 3. 触发的事件
tokio::select! {
_= rx.changed() => {
//3.1 马斯克发推特了
//3.1.1 发送报警
println!("Alert: musk's new tweet!!!");
//3.1.2 更新比特币价格
old_btc_price =*rx.borrow();
}
_ = &mut sleep => {
// 3.2 马斯克没有发推特时
// 3.2.1 得到当前比特币价格
let cur_btc = get_price_btc().await;
if let Ok(cur_btc) = cur_btc {
// 3.2.2 计算变化率
println!(
"old btc price: {:?}. current btc price: {:?}. rate is {:?}",
old_btc_price,
cur_btc,
(cur_btc - old_btc_price) / old_btc_price
);
}
}
}
}
});
let mut tweet_set = HashSet::new();
loop {
// http get method
let new_tweet = get_tweet().await?;
println!("new is {:?}", new_tweet);
if !tweet_set.contains(new_tweet) {
// 有新推特
tweet_set.insert(new_tweet);
// get BTC price
let cur_btc = get_price_btc().await?;
tx.send(cur_btc);
} else {
// 没有新推特
sleep(Duration::from_millis(1000 * 60)).await;
}
}
Ok(())
}
2.
2.1 toml
[dependencies]
tokio = { version = "1.0", features = ["full"] }
async-compression = { version = "0.3", features = ["all"] }
use async_compression::tokio::bufread::LzmaDecoder;
use std::collections::HashMap;
use std::io::{self, Error, ErrorKind, Read, Result};
use std::iter::FromIterator;
use std::path::Path;
use tokio::fs::{self, OpenOptions};
use tokio::io::{AsyncBufReadExt, BufReader};
struct OneLine {
dumpns: u64,
shmId: usize,
tradecenter: String,
preCoin: String,
postCoin: String,
klinetime: u64,
start: f32,
max: f32,
min: f32,
end: f32,
count: f32,
}
macro_rules! parse {
($iter: expr) => {
$iter.next().and_then(|x| x.parse().ok()).unwrap()
};
($iter:expr, $t:ty) => {
$iter.next().and_then(|x| x.parse::<$t>().ok()).unwrap()
};
}
impl<'a> FromIterator<&'a str> for OneLine {
fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
let mut iter = iter.into_iter();
OneLine {
dumpns: parse!(iter),
shmId: parse!(iter),
tradecenter: parse!(iter),
preCoin: parse!(iter),
postCoin: parse!(iter),
klinetime: parse!(iter),
start: parse!(iter),
max: parse!(iter),
min: parse!(iter),
end: parse!(iter),
count: parse!(iter),
}
}
}
impl OneLine {
fn new(s: String) -> Self {
s.trim().split('\t').collect()
}
}
#[tokio::main]
async fn main() -> Result<()> {
let mut input = std::io::stdin();
let mut buf = String::new();
// 1. read file path
println!("input data path");
input.read_to_string(&mut buf)?;
let dir_path = Path::new(&buf); // 压缩包的目录(相对路径)
let mut entries = fs::read_dir(&dir_path).await?;
//2. 读取目录+解压+记录数据
let mut data_collection = HashMap::new();
while let Some(entry) = entries.next_entry().await? {
//读取目录
let rel_path = entry.path();
// 2.1 读取每个文件
let file = OpenOptions::new()
.read(true)
.open(dir_path.join(rel_path))
.await?;
let buf_reader = BufReader::new(file);
// 2.2 解压
let buf_reader = LzmaDecoder::new(buf_reader);
let buf_reader = BufReader::new(buf_reader);
// 2.3 读取每一行
let mut lines = buf_reader.lines();
while let Some(line) = lines.next_line().await? {
// 2.4 根据每一行内容构建结构体
let data = OneLine::new(line);
let end = data.end;
let old = data_collection
.entry((data.preCoin.clone(), data.postCoin.clone()))
.or_insert(data);
(*old).end = end;
}
}
// 3. 写入一分钟的涨幅
for (k, mut v) in &mut data_collection {
v.end *= 0.01;
}
buf.clear();
// 4. 输入现在要交易的内容
println!("Please input pair");
println!("input format is(space is delimeter): from to mount ");
input.read_to_string(&mut buf)?;
let mut s_iter = buf.split(' ');
let from = parse!(s_iter, String);
let to = parse!(s_iter, String);
let amount = parse!(s_iter, f32);
// 5. 获取结果
let data = data_collection
.get(&(from, to))
.ok_or(Error::new(ErrorKind::Other, "oh no such pair!"))?;
let rate = (data.end - data.start) / ((9 * 60) as f32 + 1.0);
let profit = rate * amount;
println!("profit is: {}", profit);
Ok(())
}