last modified 2022.11.02
Bot Designes No.4です。
銘柄選定スクリプトで使ったライブラリを説明します。
今回はkabuステへの株式銘柄の登録に使うモジュールです。
kabuステには50銘柄まで登録できて、その登録された銘柄の情報を取得できます。
get_Token(gv.ipport, gv.kabupw)
ranking(gv.ipport, gv.x_api_key, t)
○cls_Symbols(gv.ipport, gv.x_api_key)
○set_Symbol(gv.ipport,gv.x_api_key,sc)
○unregister_symbol(gv.ipport,gv.x_api_key,sc)
get_board(gv.ipport, gv.x_api_key, sc, market)
get_symbol_info(gv.ipport, gv.x_api_key, sc)
最初に
#################################### ####################################
def cls_Symbols(ipport, x_api_key): # 登録銘柄全クリア url = f'http:{ipport}/kabusapi/unregister/all' req = urllib.request.Request(url, method='PUT') req.add_header('Content-Type', 'application/json') req.add_header('X-API-KEY', f'{x_api_key}') # req.add_header('Content-Length', 0) try: with urllib.request.urlopen(req) as res: print(res.status, res.reason) # for header in res.getheaders(): # print(header) print() content = json.loads(res.read()) except urllib.error.HTTPError as e: print(e) content = json.loads(e.read()) pprint.pprint(content) except Exception as e: print(e) content = json.loads(e.read()) pprint.pprint(content) time.sleep(0.1) return content
#################################### ####################################
#################################### ####################################
def set_Symbol(ipport, x_api_key, sc): # 1銘柄のみをkabuステに登録する obj = { "Symbols": [{ "Symbol": sc, "Exchange": 1 }] } # x_api_keyアクセスキー json_data = json.dumps(obj).encode('utf8') url = f'http:{ipport}/kabusapi/register' req = urllib.request.Request(url, json_data, method='PUT') req.add_header('Content-Type', 'application/json') req.add_header('X-API-KEY', f'{x_api_key}') try: with urllib.request.urlopen(req) as res: print(res.status, res.reason) # for header in res.getheaders(): # print(header) print() content = json.loads(res.read()) except urllib.error.HTTPError as e: print(e) content = json.loads(e.read()) pprint.pprint(content) except Exception as e: content = json.loads(e.read()) pprint.pprint(content) time.sleep(0.1) return content def set_Symbols(ipport, x_api_key, obj_record): # 複数銘柄をkabuステに登録する # obj_record 配信する銘柄リスト # { # "Symbols": [ # { # "Symbol": "9433", # "Exchange": 1 # }, # { # "Symbol": "2038", # "Exchange": 1 # } # ] # } # x_api_keyアクセスキー json_data = json.dumps(obj_record).encode('utf8') url = f'http:{ipport}/kabusapi/register' req = urllib.request.Request(url, json_data, method='PUT') req.add_header('Content-Type', 'application/json') req.add_header('X-API-KEY', f'{x_api_key}') try: with urllib.request.urlopen(req) as res: print(res.status, res.reason) # for header in res.getheaders(): # print(header) print() content = json.loads(res.read()) except urllib.error.HTTPError as e: print(e) content = json.loads(e.read()) pprint.pprint(content) except Exception as e: content = json.loads(e.read()) pprint.pprint(content) time.sleep(0.1) return content
#################################### ####################################
#################################### ####################################
def unregister_symbol(ipport, x_api_key, symbol): # 登録抹消銘柄 obj = {'Symbols': [ {'Symbol': symbol, 'Exchange': 1} ]} # json化 json_data = json.dumps(obj).encode('utf8') # 登録銘柄全クリア url = f'http:{ipport}/kabusapi/unregister' req = urllib.request.Request(url, json_data, method='PUT') req.add_header('Content-Type', 'application/json') req.add_header('X-API-KEY', f'{x_api_key}') try: with urllib.request.urlopen(req) as res: print(res.status, res.reason) # for header in res.getheaders(): # print(header) print() content = json.loads(res.read()) except urllib.error.HTTPError as e: print(e) content = json.loads(e.read()) pprint.pprint(content) except Exception as e: print(e) content = json.loads(e.read()) pprint.pprint(content) time.sleep(0.1) return content
#################################### ####################################