Source code for pyoanda.client

import json
import requests

from io import BytesIO
from time import sleep, time
from zipfile import ZipFile, BadZipfile
from logging import getLogger
from requests.exceptions import RequestException
try:
    from types import NoneType
except ImportError:
    NoneType = type(None)

from .exceptions import BadCredentials, BadRequest


log = getLogger(__name__)


[docs]class Client(object): API_VERSION = "v1" def __init__( self, environment, account_id=None, access_token=None, json_options=None ): self.domain, self.domain_stream = environment self.access_token = access_token self.account_id = account_id self.json_options = json_options or {} if account_id and not self.get_credentials(): raise BadCredentials()
[docs] def get_credentials(self): """ See more: http://developer.oanda.com/rest-live/accounts/ """ url = "{0}/{1}/accounts/{2}".format( self.domain, self.API_VERSION, self.account_id ) try: response = self._Client__call(uri=url) assert len(response) > 0 return response except RequestException: return False except AssertionError: return False
def __get_response(self, uri, params=None, method="get", stream=False): """Creates a response object with the given params and option Parameters ---------- url : string The full URL to request. params: dict A list of parameters to send with the request. This will be sent as data for methods that accept a request body and will otherwise be sent as query parameters. method : str The HTTP method to use. stream : bool Whether to stream the response. Returns a requests.Response object. """ if not hasattr(self, "session") or not self.session: self.session = requests.Session() if self.access_token: self.session.headers.update( {'Authorization': 'Bearer {}'.format(self.access_token)} ) # Remove empty params if params: params = {k: v for k, v in params.items() if v is not None} kwargs = { "url": uri, "verify": True, "stream": stream } kwargs["params" if method == "get" else "data"] = params return getattr(self.session, method)(**kwargs) def __call(self, uri, params=None, method="get"): """Only returns the response, nor the status_code """ try: resp = self.__get_response(uri, params, method, False) rjson = resp.json(**self.json_options) assert resp.ok except AssertionError: msg = "OCode-{}: {}".format(resp.status_code, rjson["message"]) raise BadRequest(msg) except Exception as e: msg = "Bad response: {}".format(e) log.error(msg, exc_info=True) raise BadRequest(msg) else: return rjson def __call_stream(self, uri, params=None, method="get"): """Returns an stream response """ try: resp = self.__get_response(uri, params, method, True) assert resp.ok except AssertionError: raise BadRequest(resp.status_code) except Exception as e: log.error("Bad response: {}".format(e), exc_info=True) else: return resp
[docs] def get_instruments(self): """ See more: http://developer.oanda.com/rest-live/rates/#getInstrumentList """ url = "{0}/{1}/instruments".format(self.domain, self.API_VERSION) params = {"accountId": self.account_id} try: response = self._Client__call(uri=url, params=params) assert len(response) > 0 return response except RequestException: return False except AssertionError: return False
[docs] def get_prices(self, instruments, stream=True): """ See more: http://developer.oanda.com/rest-live/rates/#getCurrentPrices """ url = "{0}/{1}/prices".format( self.domain_stream if stream else self.domain, self.API_VERSION ) params = {"accountId": self.account_id, "instruments": instruments} call = {"uri": url, "params": params, "method": "get"} try: if stream: return self._Client__call_stream(**call) else: return self._Client__call(**call) except RequestException: return False except AssertionError: return False
[docs] def get_instrument_history(self, instrument, candle_format="bidask", granularity='S5', count=500, daily_alignment=None, alignment_timezone=None, weekly_alignment="Monday", start=None, end=None): """ See more: http://developer.oanda.com/rest-live/rates/#retrieveInstrumentHistory """ url = "{0}/{1}/candles".format(self.domain, self.API_VERSION) params = { "accountId": self.account_id, "instrument": instrument, "candleFormat": candle_format, "granularity": granularity, "count": count, "dailyAlignment": daily_alignment, "alignmentTimezone": alignment_timezone, "weeklyAlignment": weekly_alignment, "start": start, "end": end, } try: return self._Client__call(uri=url, params=params, method="get") except RequestException: return False except AssertionError: return False
[docs] def get_orders(self, instrument=None, count=50): """ See more: http://developer.oanda.com/rest-live/orders/#getOrdersForAnAccount """ url = "{0}/{1}/accounts/{2}/orders".format( self.domain, self.API_VERSION, self.account_id ) params = {"instrument": instrument, "count": count} try: return self._Client__call(uri=url, params=params, method="get") except RequestException: return False except AssertionError: return False
[docs] def get_order(self, order_id): """ See more: http://developer.oanda.com/rest-live/orders/#getInformationForAnOrder """ url = "{0}/{1}/accounts/{2}/orders/{3}".format( self.domain, self.API_VERSION, self.account_id, order_id ) try: return self._Client__call(uri=url, method="get") except RequestException: return False except AssertionError: return False
[docs] def create_order(self, order): """ See more: http://developer.oanda.com/rest-live/orders/#createNewOrder """ url = "{0}/{1}/accounts/{2}/orders".format( self.domain, self.API_VERSION, self.account_id ) try: return self._Client__call( uri=url, params=order.__dict__, method="post" ) except RequestException: return False except AssertionError: return False
[docs] def update_order(self, order_id, order): """ See more: http://developer.oanda.com/rest-live/orders/#modifyExistingOrder """ url = "{0}/{1}/accounts/{2}/orders/{3}".format( self.domain, self.API_VERSION, self.account_id, order_id ) try: return self._Client__call( uri=url, params=order.__dict__, method="patch" ) except RequestException: return False except AssertionError: return False
[docs] def close_order(self, order_id): """ See more: http://developer.oanda.com/rest-live/orders/#closeOrder """ url = "{0}/{1}/accounts/{2}/orders/{3}".format( self.domain, self.API_VERSION, self.account_id, order_id ) try: return self._Client__call(uri=url, method="delete") except RequestException: return False except AssertionError: return False
[docs] def get_trades(self, max_id=None, count=None, instrument=None, ids=None): """ Get a list of open trades Parameters ---------- max_id : int The server will return trades with id less than or equal to this, in descending order (for pagination) count : int Maximum number of open trades to return. Default: 50 Max value: 500 instrument : str Retrieve open trades for a specific instrument only Default: all ids : list A list of trades to retrieve. Maximum number of ids: 50. No other parameter may be specified with the ids parameter. See more: http://developer.oanda.com/rest-live/trades/#getListOpenTrades """ url = "{0}/{1}/accounts/{2}/trades".format( self.domain, self.API_VERSION, self.account_id ) params = { "maxId": int(max_id) if max_id and max_id > 0 else None, "count": int(count) if count and count > 0 else None, "instrument": instrument, "ids": ','.join(ids) if ids else None } try: return self._Client__call(uri=url, params=params, method="get") except RequestException: return False except AssertionError: return False
[docs] def get_trade(self, trade_id): """ Get information on a specific trade. Parameters ---------- trade_id : int The id of the trade to get information on. See more: http://developer.oanda.com/rest-live/trades/#getInformationSpecificTrade """ url = "{0}/{1}/accounts/{2}/trades/{3}".format( self.domain, self.API_VERSION, self.account_id, trade_id ) try: return self._Client__call(uri=url, method="get") except RequestException: return False except AssertionError: return False
[docs] def update_trade( self, trade_id, stop_loss=None, take_profit=None, trailing_stop=None ): """ Modify an existing trade. Note: Only the specified parameters will be modified. All other parameters will remain unchanged. To remove an optional parameter, set its value to 0. Parameters ---------- trade_id : int The id of the trade to modify. stop_loss : number Stop Loss value. take_profit : number Take Profit value. trailing_stop : number Trailing Stop distance in pips, up to one decimal place See more: http://developer.oanda.com/rest-live/trades/#modifyExistingTrade """ url = "{0}/{1}/accounts/{2}/trades/{3}".format( self.domain, self.API_VERSION, self.account_id, trade_id ) params = { "stopLoss": stop_loss, "takeProfit": take_profit, "trailingStop": trailing_stop } try: return self._Client__call(uri=url, params=params, method="patch") except RequestException: return False except AssertionError: return False raise NotImplementedError()
[docs] def close_trade(self, trade_id): """ Close an open trade. Parameters ---------- trade_id : int The id of the trade to close. See more: http://developer.oanda.com/rest-live/trades/#closeOpenTrade """ url = "{0}/{1}/accounts/{2}/trades/{3}".format( self.domain, self.API_VERSION, self.account_id, trade_id ) try: return self._Client__call(uri=url, method="delete") except RequestException: return False except AssertionError: return False
[docs] def get_positions(self): """ Get a list of all open positions. See more: http://developer.oanda.com/rest-live/positions/#getListAllOpenPositions """ url = "{0}/{1}/accounts/{2}/positions".format( self.domain, self.API_VERSION, self.account_id ) try: return self._Client__call(uri=url, method="get") except RequestException: return False except AssertionError: return False
[docs] def get_position(self, instrument): """ Get the position for an instrument. Parameters ---------- instrument : string The instrument to get the open position for. See more: http://developer.oanda.com/rest-live/positions/#getPositionForInstrument """ url = "{0}/{1}/accounts/{2}/positions/{3}".format( self.domain, self.API_VERSION, self.account_id, instrument ) try: return self._Client__call(uri=url, method="get") except RequestException: return False except AssertionError: return False
[docs] def close_position(self, instrument): """ Close an existing position Parameters ---------- instrument : string The instrument to close the position for. See more: http://developer.oanda.com/rest-live/positions/#closeExistingPosition """ url = "{0}/{1}/accounts/{2}/positions/{3}".format( self.domain, self.API_VERSION, self.account_id, instrument ) try: return self._Client__call(uri=url, method="delete") except RequestException: return False except AssertionError: return False
[docs] def get_transactions( self, max_id=None, count=None, instrument="all", ids=None ): """ Get a list of transactions. Parameters ---------- max_id : int The server will return transactions with id less than or equal to this, in descending order (for pagination). count : int Maximum number of open transactions to return. Default: 50. Max value: 500. instrument : str Retrieve open transactions for a specific instrument only. Default: all. ids : list A list of transactions to retrieve. Maximum number of ids: 50. No other parameter may be specified with the ids parameter. See more: http://developer.oanda.com/rest-live/transaction-history/#getTransactionHistory http://developer.oanda.com/rest-live/transaction-history/#transactionTypes """ url = "{0}/{1}/accounts/{2}/transactions".format( self.domain, self.API_VERSION, self.account_id ) params = { "maxId": int(max_id) if max_id and max_id > 0 else None, "count": int(count) if count and count > 0 else None, "instrument": instrument, "ids": ','.join(ids) if ids else None } try: return self._Client__call(uri=url, params=params, method="get") except RequestException: return False except AssertionError: return False
[docs] def get_transaction(self, transaction_id): """ Get information on a specific transaction. Parameters ---------- transaction_id : int The id of the transaction to get information on. See more: http://developer.oanda.com/rest-live/transaction-history/#getInformationForTransaction http://developer.oanda.com/rest-live/transaction-history/#transactionTypes """ url = "{0}/{1}/accounts/{2}/transactions/{3}".format( self.domain, self.API_VERSION, self.account_id, transaction_id ) try: return self._Client__call(uri=url, method="get") except RequestException: return False except AssertionError: return False
[docs] def request_transaction_history(self): """ Request full account history. Submit a request for a full transaction history. A successfully accepted submission results in a response containing a URL in the Location header to a file that will be available once the request is served. Response for the URL will be HTTP 404 until the file is ready. Once served the URL will be valid for a certain amount of time. See more: http://developer.oanda.com/rest-live/transaction-history/#getFullAccountHistory http://developer.oanda.com/rest-live/transaction-history/#transactionTypes """ url = "{0}/{1}/accounts/{2}/alltransactions".format( self.domain, self.API_VERSION, self.account_id ) try: resp = self.__get_response(url) return resp.headers['location'] except RequestException: return False except AssertionError: return False
[docs] def get_transaction_history(self, max_wait=5.0): """ Download full account history. Uses request_transaction_history to get the transaction history URL, then polls the given URL until it's ready (or the max_wait time is reached) and provides the decoded response. Parameters ---------- max_wait : float The total maximum time to spend waiting for the file to be ready; if this is exceeded a failed response will be returned. This is not guaranteed to be strictly followed, as one last attempt will be made to check the file before giving up. See more: http://developer.oanda.com/rest-live/transaction-history/#getFullAccountHistory http://developer.oanda.com/rest-live/transaction-history/#transactionTypes """ url = self.request_transaction_history() if not url: return False ready = False start = time() delay = 0.1 while not ready and delay: response = requests.head(url) ready = response.ok if not ready: sleep(delay) time_remaining = max_wait - time() + start max_delay = max(0., time_remaining - .1) delay = min(delay * 2, max_delay) if not ready: return False response = requests.get(url) try: with ZipFile(BytesIO(response.content)) as container: files = container.namelist() if not files: log.error('Transaction ZIP has no files.') return False history = container.open(files[0]) raw = history.read().decode('ascii') except BadZipfile: log.error('Response is not a valid ZIP file', exc_info=True) return False return json.loads(raw, **self.json_options)
[docs] def create_account(self, currency=None): """ Create a new account. This call is only available on the sandbox system. Please create accounts on fxtrade.oanda.com on our production system. See more: http://developer.oanda.com/rest-sandbox/accounts/#-a-name-createtestaccount-a-create-a-test-account """ url = "{0}/{1}/accounts".format(self.domain, self.API_VERSION) params = {"currency": currency} try: return self._Client__call(uri=url, params=params, method="post") except RequestException: return False except AssertionError: return False
[docs] def get_accounts(self, username=None): """ Get a list of accounts owned by the user. Parameters ---------- username : string The name of the user. Note: This is only required on the sandbox, on production systems your access token will identify you. See more: http://developer.oanda.com/rest-sandbox/accounts/#-a-name-getaccountsforuser-a-get-accounts-for-a-user """ url = "{0}/{1}/accounts".format(self.domain, self.API_VERSION) params = {"username": username} try: return self._Client__call(uri=url, params=params, method="get") except RequestException: return False except AssertionError: return False