Skip to content

Commit

Permalink
Add example of using shioajidownloader from sinopac data source (#1266)
Browse files Browse the repository at this point in the history
* Add files via upload

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
Charliesj0129 and pre-commit-ci[bot] authored Aug 5, 2024
1 parent 5d5fbe3 commit 2bdf524
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 125 deletions.
117 changes: 117 additions & 0 deletions example_of_shioaji_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from __future__ import annotations

import datetime
import gc
import itertools

import numpy as np
import pandas as pd
import yfinance as yf

from finrl import config_tickers
from finrl.config import INDICATORS
from finrl.meta.preprocessor.preprocessors import data_split
from finrl.meta.preprocessor.preprocessors import FeatureEngineer
from processor_sinopac import SinopacProcessor
from shioajidownloader import SinopacDownloader

TAI_0050_TICKER = [
"3008", # Largan Precision Co., Ltd.
"1303", # Nan Ya Plastics Corporation
"2412", # Chunghwa Telecom Co., Ltd.
"1301", # Formosa Plastics Corporation
"1216", # Uni-President Enterprises Corporation
"2881", # Fubon Financial Holding Co., Ltd.
"2882", # Cathay Financial Holding Co., Ltd.
"5871", # China Development Financial Holding Corporation
"2886", # Mega Financial Holding Co., Ltd.
"2891", # CTBC Financial Holding Co., Ltd.
"2884", # E.SUN Financial Holding Co., Ltd.
"5880", # Yuanta Financial Holding Co., Ltd.
"2883", # China Development Financial Holding Corporation
"2892", # First Financial Holding Co., Ltd.
"2880", # SinoPac Financial Holdings Company Limited
"2303", # United Microelectronics Corporation
"1326", # Formosa Chemicals & Fibre Corporation
"1101", # Taiwan Cement Corp.
"3006", # Advanced Semiconductor Engineering, Inc.
"3045", # Compal Electronics Inc.
"2912", # President Chain Store Corporation
"2327", # ASE Technology Holding Co., Ltd.
"1304", # China Petrochemical Development Corporation
"2379", # Realtek Semiconductor Corp.
"2801", # Chang Hwa Commercial Bank, Ltd.
"1402", # Far Eastern New Century Corporation
"2345", # Acer Incorporated
"2301", # Lite-On Technology Corporation
"2408", # AU Optronics Corp.
"2357", # Asustek Computer Inc.
"9910", # Feng Hsin Iron & Steel Co., Ltd.
"2395", # Advantech Co., Ltd.
"2353", # Acer Incorporated
"2354", # Micro-Star International Co., Ltd.
"3711", # ASE Technology Holding Co., Ltd.
"2890", # Taishin Financial Holding Co., Ltd.
"2377", # Micro-Star International Co., Ltd.
"4904", # Far EasTone Telecommunications Co., Ltd.
"2324", # Compal Electronics, Inc.
"2305", # First International Computer, Inc.
"1102", # Asia Cement Corporation
"9933", # Mega Financial Holding Co., Ltd.
]

TRAIN_START_DATE = "2023-04-13"
TRAIN_END_DATE = "2024-04-13"
TRADE_START_DATE = "2024-04-13"
TRADE_END_DATE = "2024-07-31"


def process_ticker_data(ticker):
print(f"Processing data for ticker: {ticker}")
df_raw = SinopacDownloader(
start_date=TRAIN_START_DATE, end_date=TRADE_END_DATE, ticker_list=[ticker]
).fetch_data()

df_raw.rename(
columns={
"open": "Open",
"high": "High",
"low": "Low",
"close": "Close",
"volume": "Volume",
"amount": "Amount",
},
inplace=True,
)

processor = SinopacProcessor(
API_KEY="3Tn2BbtCzbaU1KSy8yyqLa4m7LEJJyhkRCDrK2nknbcu",
API_SECRET="Epakqh1Nt4inC3hsqowE2XjwQicPNzswkuLjtzj2WKpR",
)

cleaned_df = processor.clean_data(df_raw)
df_with_indicators = processor.add_technical_indicator(cleaned_df)
df_with_vix = processor.add_vix(df_with_indicators)
df_with_turbulence = processor.add_turbulence(df_with_vix, time_period=252)

# Save processed data for each ticker to a separate file
df_with_turbulence.to_csv(f"data_{ticker}.csv")

# Explicitly delete unused objects and collect garbage
del df_raw, cleaned_df, df_with_indicators, df_with_vix, df_with_turbulence
gc.collect()


df_final = pd.DataFrame()
for ticker in TAI_0050_TICKER:
process_ticker_data(ticker)
# Load processed data from file and concatenate
df_ticker = pd.read_csv(f"data_{ticker}.csv")
df_final = pd.concat([df_final, df_ticker], ignore_index=True)
del df_ticker # free up memory
gc.collect()

train = data_split(df_final, TRAIN_START_DATE, TRAIN_END_DATE)
trade = data_split(df_final, TRADE_START_DATE, TRADE_END_DATE)
train.to_csv("train_data.csv")
trade.to_csv("trade_data.csv")
121 changes: 66 additions & 55 deletions processor_sinopac.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,17 @@
from stockstats import StockDataFrame as Sdf
from talib import abstract

from finrl.meta.preprocessor.shioajidownloader import sinopacDownloader
from shioajidownloader import SinopacDownloader


class SinopacProcessor:
def __init__(self, API_KEY=None, API_SECRET=None, api=None):
if api is None:
try:
self.api = sj.Shioaji()
print("API connected")
print("enter API_KEY and API_SECRET")
API_KEY = input("API_KEY: ")
API_SECRET = input("API_SECRET: ")
self.api.login(
API_KEY=API_KEY,
API_SECRET=API_SECRET,
api_key=API_KEY,
secret_key=API_SECRET,
contracts_cb=lambda security_type: print(
f"{repr(security_type)} fetch done."
),
Expand All @@ -40,16 +36,8 @@ def __init__(self, API_KEY=None, API_SECRET=None, api=None):
self.api = api

def download_data(self):
# 創建 sinopacDonwloader 的ticker_list
print("enter start date")
start_date = input("start date: ")
print("enter end date")
end_date = input("end date: ")
print("enter ticker list")
ticker_list = input("ticker list: ")
ticker_list = ticker_list.astype(str).split(",")
# 創建 sinopacDownloader 實例
downloader = sinopacDownloader(
downloader = SinopacDownloader(
api=self.api,
start_date=self.start_date,
end_date=self.end_date,
Expand All @@ -62,33 +50,38 @@ def download_data(self):
@staticmethod
def clean_individual_ticker(args):
tic, df, times = args
# 筛选特定股票并重设索引
tic_df = df[df["tic"] == tic].set_index("timestamp")

# 创建一个新的 DataFrame 以确保所有时间点都被包括
# Create a new DataFrame to ensure all time points are included
tmp_df = pd.DataFrame(index=times)
tmp_df = tmp_df.join(
tic_df[["Open", "High", "Low", "Close", "Volume", "Amount"]], how="left"
)

# 处理 NaN 值,使用前一个可用值填充
tmp_df.fillna(method="ffill", inplace=True)
# Fill NaN values using forward fill
tmp_df.ffill(inplace=True)

# 附加股票代码和日期
# Append ticker code and date
tmp_df["tic"] = tic
tmp_df["date"] = tmp_df.index.strftime("%Y-%m-%d")

tmp_df.reset_index(inplace=True)
tmp_df.rename(columns={"index": "timestamp"}, inplace=True)

return tmp_df

def clean_data(self, df):

print("Data cleaning started")
tic_list = df["tic"].unique()
n_tickers = len(tic_list)
self.start = df["timestamp"].min()
self.end = df["timestamp"].max()

# 生成全时间序列
start = pd.to_datetime(self.start_date)
end = pd.to_datetime(self.end_date)
times = pd.date_range(start=start, end=end, freq="T") # 'T' 代表分钟级别的频率
times = pd.date_range(
start=self.start, end=self.end, freq="min"
) # 'T' 代表分钟级别的频率

# 处理每个股票的数据
results = []
Expand All @@ -98,12 +91,13 @@ def clean_data(self, df):

# 合并结果
new_df = pd.concat(results)

print(new_df.columns)
print("Data cleaning finished!")
return new_df.reset_index(drop=True)

def add_technical_indicator(self, df):
print("Started adding Indicators")
print(df.columns)
tech_indicator_list = talib.get_functions() # 获取所有 TA-Lib 可用指标

# 调整列名以匹配 TA-Lib 的需求
Expand All @@ -121,44 +115,61 @@ def add_technical_indicator(self, df):
# 循环添加每个指标
for indicator in tech_indicator_list:
try:
# 获取指标函数
indicator_function = getattr(talib.abstract, indicator)
# 计算指标
result = indicator_function(df)

# 如果结果是 Series,转换为 DataFrame 并重命名列
if isinstance(result, pd.Series):
df[indicator.lower()] = result
else: # 如果结果是 DataFrame,合并所有列
result.columns = [
f"{indicator.lower()}_{col}" for col in result.columns
]
df = pd.concat([df, result], axis=1)
if indicator == "MAVP":
pass
else:
# 获取指标函数
indicator_function = getattr(talib.abstract, indicator)
# 计算指标
result = indicator_function(df)

# 如果结果是 Series,转换为 DataFrame 并重命名列
if isinstance(result, pd.Series):
df[indicator.lower()] = result
else: # 如果结果是 DataFrame,合并所有列
result.columns = [
f"{indicator.lower()}_{col}" for col in result.columns
]
df = pd.concat([df, result], axis=1)
except Exception as e:
print(f"Error calculating {indicator}: {str(e)}")
print(df.head())
print(df.shape())
print(df.tail())
print("Finished adding Indicators")
df.rename(
columns={
"open": "Open",
"high": "High",
"low": "Low",
"close": "Close",
"volume": "Volume",
},
inplace=True,
)
print(df.columns)
return df

# Allows to multithread the add_vix function for quicker execution
def download_and_clean_data(self):
# VIX_index start at 2023-04-12
vix_kbars = self.api.kbars(
contract=self.api.Contracts.Indexs.TAIFEX["TAIFEXTAIWANVIX"],
start=self.start,
end=self.end,
start=self.start.strftime("%Y-%m-%d"),
end=self.end.strftime("%Y-%m-%d"),
)
vix_df = pd.DataFrame({**vix_kbars})
vix_df.ts = pd.to_datetime(vix_df.ts)
return self.clean_data(vix_df)
return vix_df

def add_vix(self, data):
cleaned_vix = self.download_and_clean_data()
vix = cleaned_vix[["ts", "close"]]
vix = vix.rename(columns={"ts": "timestamp", "close": "VIXY"})

vix = cleaned_vix[["ts", "Close"]]
vix = vix.rename(columns={"ts": "timestamp", "Close": "VIXY"})
print("Started adding VIX data")
print(vix.head())
print(data.columns)
if "timestamp" not in data.columns:
print("No timestamp column found")
data = data.copy()
data = data.merge(vix, on="timestamp")
data = data.sort_values(["timestamp", "tic"]).reset_index(drop=True)
Expand All @@ -168,7 +179,7 @@ def add_vix(self, data):
def calculate_turbulence(self, data, time_period=252):
# can add other market assets
df = data.copy()
df_price_pivot = df.pivot(index="timestamp", columns="tic", values="close")
df_price_pivot = df.pivot(index="timestamp", columns="tic", values="Close")
# use returns to calculate turbulence
df_price_pivot = df_price_pivot.pct_change()

Expand Down Expand Up @@ -234,7 +245,7 @@ def df_to_array(self, df, tech_indicator_list, if_vix):
if_first_time = True
for tic in unique_ticker:
if if_first_time:
price_array = df[df.tic == tic][["close"]].values
price_array = df[df.tic == tic][["Close"]].values
tech_array = df[df.tic == tic][tech_indicator_list].values
if if_vix:
turbulence_array = df[df.tic == tic]["VIXY"].values
Expand All @@ -243,7 +254,7 @@ def df_to_array(self, df, tech_indicator_list, if_vix):
if_first_time = False
else:
price_array = np.hstack(
[price_array, df[df.tic == tic][["close"]].values]
[price_array, df[df.tic == tic][["Close"]].values]
)
tech_array = np.hstack(
[tech_array, df[df.tic == tic][tech_indicator_list].values]
Expand All @@ -266,11 +277,11 @@ def on_tick(self, exchange: Exchange, tick: TickSTKv1):
tick_data = {
"timestamp": tick.datetime,
"tic": tick.code,
"open": float(tick.open),
"high": float(tick.high),
"low": float(tick.low),
"close": float(tick.close),
"volume": tick.volume,
"Open": float(tick.open),
"High": float(tick.high),
"Low": float(tick.low),
"Close": float(tick.close),
"Volume": tick.volume,
}
self.data = self.data.append(tick_data, ignore_index=True)

Expand All @@ -290,7 +301,7 @@ def resample_to_kbars(group):
group.set_index("timestamp", inplace=True)
ohlc_dict = {"price": "ohlc", "volume": "sum"}
kbars = group.resample("1T").apply(ohlc_dict)
kbars.columns = ["open", "high", "low", "close", "volume"]
kbars.columns = ["Open", "High", "Low", "Close", "Volume"]
return kbars

kbars_data = []
Expand All @@ -316,5 +327,5 @@ def resample_to_kbars(group):
start=self.end_date,
end=self.end_date,
)
latest_turb = pd.DataFrame({**turb_df})["close"].values
latest_turb = pd.DataFrame({**turb_df})["Close"].values
return latest_price, latest_tech, latest_turb
Loading

0 comments on commit 2bdf524

Please sign in to comment.