Usage Examples

LedgerX API Python SDK is very simple to use. But let’s discover a few corner stone concepts about the API first.

Library Context

The Context class is where the main I/O loop resides and also where message dispatch takes place. This class represents the context for the entire library, it needs to be initialized with a subclass of MessageProcessor that implements all of the abstract methods to respond to received messages from the exchange.

This class implements an interface similar to Python‘s Thread. Start, Stop, and Join behave as expected in Thread.

Some functions on Context such as submit are there to be extensible but are not supposed to be used directly.

Context expects the FQDN of the exchange endpoint to be specified as its first argument. You can specify ledgerx.sdk.settings.EXCHANGE_CTE_FQDN to have the context connect to the Customer Test Environment (CTE), or use ledgerx.sdk.settings.EXCHANGE_TRADE_FQDN to trade live on the market.

Message Processing

The MessageProcessor class is an abstract class to provide an interface contract for subclasses to implement all the abstract methods for handling messages and high level socket state events.

Logging

By default the SDK won’t show logs to stdout. you must enable stream logging first to see any logs from the SDK. In your application add a logging.StreamHandler to your logger.

import sys
import logging

logger = logging.getLogger()
logger.addHandler(logging.StreamHandler(sys.stdout))

Key Pair Object

The KeyPair class is used to represent an EC key pair object that stores the private and public key pair of 0MQ ZAP/CURVE mechanism.

This class also supports generation of new key pairs through the generate(), saving and loading key pair certificates through save_certificate() and load_certificate().

Fetching Exchange’s Public Key

To fetch the Customer Test Environment exchange public key you can issue the following command:

$ pyledgerx --fetch-exchange-key test.ledgerx.com

Specifying a different domain name (e.g., trade.ledgerx.com) will fetch a different exchange’s public key.

Initializing Context Example

from ledgerx.protocol.crypto import KeyPair

from ledgerx.sdk import settings
from ledgerx.sdk.context import Context
from ledgerx.sdk.processor import MessageProcessor

class MyMessageProcessor(MessageProcessor):
    pass

client_keypair = KeyPair.generate()
server_public_key = b'somepublickey'
context = Context(settings.EXCHANGE_CTE_FQDN, client_keypair.private,
    client_keypair.public, server_public_key, MyMessageProcessor)

context.start()

Submitting and Handling Messages Example

from ledgerx.protocol.crypto import KeyPair

from ledgerx.sdk import settings
from ledgerx.sdk.context import Context
from ledgerx.sdk.commands import get_cmd_version
from ledgerx.sdk.processor import MessageProcessor

class MyMessageProcessor(MessageProcessor):
    def handle_run_change(self, run_id, prev_run_id):
        # If this handler is invoked, open orders
        # are considered canceled and should be
        # purged from your local state
        pass
    def handle_action_report(self, msg, prev_msg):
        # If this is the first action report, prev_msg will be
        # the limit_order command's message.
        pass
    def handle_contract_detail(self, msg, prev_msg): pass
    def handle_status_message(self, msg, prev_msg):
        # You can handle all status messages here (e.g., ACK messages).
        pass
    def handle_batch_message(self, msg, prev_msg):
        # You can handle all batch messages here
        # a batch message is received when
        # it is a batch of acks for a sent batch
        pass
    def handle_book_top(self, msg, prev_msg):
        # This handler is for the book top message for the limit order.
        pass
    def handle_book_state_snapshot(self, msg, prev_msg): pass
    def handle_heartbeat(self, msg, prev_msg): pass

client_keypair = KeyPair.generate()
server_public_key = b'somepublickey'
context = Context(settings.EXCHANGE_CTE_FQDN, client_keypair.private,
    client_keypair.public, server_public_key, MyMessageProcessor)

context.start()

cmd = get_cmd_version('0.0.1').Commands(context)
cmd.limit_order()

context.join()

Trading Session Example

import os
import time
from random import randint

from ledgerx.protocol.crypto import KeyPair

from ledgerx.sdk import settings
from ledgerx.sdk.context import Context
from ledgerx.sdk.commands import get_cmd_version
from ledgerx.sdk.processor import MessageProcessor

# A list of contracts on the exchange
contracts = []

class MyMessageProcessor(MessageProcessor):

    def handle_run_change(self, run_id, prev_run_id):
        # If this handler is invoked, open orders
        # are considered canceled and should be
        # purged from your local state
        pass

    def handle_action_report(self, msg, prev_msg):
        # If this is the first action report, prev_msg will be
        # the limit_order command's message.
        print(msg, prev_msg)

    def handle_contract_detail(self, msg, prev_msg):
        global contracts; contracts.append(msg)

    def handle_status_message(self, msg, prev_msg):
        # You should receive an ACK status message for the get_contract
        # call then another ACK message for the limit order submitted. You
        # might receive other status messages depending on the parameters
        # passed to the commands you issued.
        print(msg.status, msg.message, prev_msg)

    def handle_batch_message(self, msg, prev_msg):
        pass

    def handle_book_top(self, msg, prev_msg):
        print(msg, prev_msg)

    def handle_book_state_snapshot(self, msg, prev_msg):
        # This will receive the MessageBookStateSnapshot that contains all
        # resting limit orders on a contract. prev_msg will be the
        # MessageGetBookState request message.
        print(msg, prev_msg)

    def handle_heartbeat(self, msg, prev_msg):
        # Upon receiving a heartbeat, note the `interval_ms` property.
        # This value is the number of milliseconds after which you should
        # have received the next heartbeat message. If you do not receive
        # another heartbeat after this interval has elapsed, you may assume
        # the exchange went offline.
        print(msg, prev_msg)

class StoreMIDCallback(object):
    """\
    A callback to store the generated Message ID (MID) which is also the
    ID that points to an entry on the exchange's CLOB.
    """
    def __init__(self):
        self._mid = None

    def __call__(self, msg, prev_msg):
        self._mid = msg.mid

    @property
    def entry_id(self):
        return self._mid

# You have to upload your public key to your account
client_keypair = KeyPair.generate()

# Acquire server's public key and set it here
server_public_key = b'somepublickey'

# Establish context
context = Context(settings.EXCHANGE_CTE_FQDN, client_keypair.private,
    client_keypair.public, server_public_key, MyMessageProcessor)

context.start()

cmd = get_cmd_version('0.0.1').Commands(context)

# Acquire a list of contracts on the exchange
cmd.get_contract(all_contracts=True)
time.sleep(3) # allow enough time to build the contracts list

if not len(contracts):
    print("loading contracts failed")
    # Timeout after 1 second in case 0MQ lingering option was set and there
    # are messages waiting to be processed. We don't want those in case we
    # need to exit.
    context.join(1)
    raise SystemExit

# Pick a contract at random
contract = contracts[randint(0, len(contracts) - 1)]

# Submit a limit order
sm_cb = StoreMIDCallback()
item = cmd.limit_order(size=1, price_in_cents=5000,
    contract_id=contract.contract_id)
item.callback = sm_cb
time.sleep(3) # allow enough time to get a reply with the Entry ID

print("Order's ID: {}".format(sm_cb.entry_id))

# Timeout after 3 seconds just in case there are lingering messages that
# need to be consumed from 0MQ internal queues.
context.join(3) # Timeout after 3 seconds

Orders Batch Example

import os
import time
from random import randint

from ledgerx.protocol.crypto import KeyPair

from ledgerx.sdk import settings
from ledgerx.sdk.context import Context
from ledgerx.sdk.commands import get_cmd_version
from ledgerx.sdk.processor import MessageProcessor
from ledgerx.api.client import v0_0_1 as api

# A list of contracts on the exchange
contracts = []

class MyMessageProcessor(MessageProcessor):

    def handle_run_change(self, run_id, prev_run_id):
        # If this handler is invoked, open orders
        # are considered canceled and should be
        # purged from your local state
        pass

    def handle_action_report(self, msg, prev_msg):
        # This will receive action reports if any orders
        # were placed.
        # Even though you will receive action reports in
        # the callback for the order message (if any exist)
        pass

    def handle_contract_detail(self, msg, prev_msg):
        global contracts; contracts.append(msg)

    def handle_status_message(self, msg, prev_msg):
        # You should receive an ACK status message for the get_contract
        # call then another ACK message for batch message sent.
        # You might receive other status messages depending on
        # the parameters passed to the commands you issued.
        pass

    def handle_batch_message(self, msg, prev_msg):
        # You should receive a batch message of acks for the
        # batch message sent.
        for ack in msg.messages:
            # do something with an ack
            continue

    def handle_book_top(self, msg, prev_msg):
        pass

    def handle_book_state_snapshot(self, msg, prev_msg):
        # This will receive the MessageBookStateSnapshot that contains all
        # resting limit orders on a contract. prev_msg will be the
        # MessageGetBookState request message.
        pass

    def handle_heartbeat(self, msg, prev_msg):
        # Upon receiving a heartbeat, note the `interval_ms` property.
        # This value is the number of milliseconds after which you should
        # have received the next heartbeat message. If you do not receive
        # another heartbeat after this interval has elapsed, you may assume
        # the exchange went offline.
        pass

class ProcessOrderResponses(object):
    """\
    A callback to process order responses
    Responses might be:
    - MessageStatus
    - MessageActionReport
    """
    def __init__(self):
        self._mid = None

    def __call__(self, msg, prev_msg):
        if isinstance(msg, api.MessageStatus):
            self.process_status(msg)
        else:
            self.process_ar(msg)

    def process_ar(self, msg):
        """
        process an action report
        """
        pass

    def process_status(self, msg):
        """
        process a status message
        """
        if msg.status == api.MessageStatus.STATUS_ORDER_SUCCESS:
            print("Order {} placed successfully".format(msg.mid))
        else:
            print("Order {} failed with code {}".format(msg.mid, msg.status))

# You have to upload your public key to your account
client_keypair = KeyPair.generate()

# Acquire server's public key and set it here
server_public_key = b'somepublickey'

# Establish context
context = Context(settings.EXCHANGE_CTE_FQDN, client_keypair.private,
    client_keypair.public, server_public_key, MyMessageProcessor)

context.start()

cmd = get_cmd_version('0.0.1').Commands(context)

# Acquire a list of contracts on the exchange
cmd.get_contract(all_contracts=True)
time.sleep(3) # allow enough time to build the contracts list

if not len(contracts):
    print("loading contracts failed")
    # Timeout after 1 second in case 0MQ lingering option was set and there
    # are messages waiting to be processed. We don't want those in case we
    # need to exit.
    context.join(1)
    raise SystemExit

# Pick a contract at random
random_contract = lambda : contracts[randint(0, len(contracts) - 1)]

# create a set of limit orders
# note: a batch message can contain different types of messages
# and not necessarily of the same type
orders = []
for _ in range(10):
    contract = random_contract()
    order = api.MessageLimitOrder()
    order.size = 1
    order.price_in_cents = 100
    order.contract_id=contract.contract_id
    orders.append(order)

(batch_item, items) = cmd.batch(messages=orders)
for idx, item in enumerate(items):
    item.callback = ProcessOrderResponses()
    print("Order no.{} ID: {}".format(idx, item.msg.mid))

time.sleep(2) # Wait for replies

# Timeout after 3 seconds just in case there are lingering messages that
# need to be consumed from 0MQ internal queues.
context.join(3) # Timeout after 3 seconds

Run Change example

import os
import time
from random import randint

from ledgerx.protocol.crypto import KeyPair

from ledgerx.sdk import settings
from ledgerx.sdk.context import Context
from ledgerx.sdk.commands import get_cmd_version
from ledgerx.sdk.processor import MessageProcessor

# A list of contracts on the exchange
contracts = []
orders = []

class MyMessageProcessor(MessageProcessor):

    def handle_run_change(self, run_id, prev_run_id):
        # If this handler is invoked, open orders
        # are considered canceled and should be
        # purged from your local state
        global orders; orders = []

    def handle_action_report(self, msg, prev_msg):
        # If this is the first action report, prev_msg will be
        # the limit_order command's message.
        print(msg, prev_msg)

    def handle_contract_detail(self, msg, prev_msg):
        global contracts; contracts.append(msg)

    def handle_status_message(self, msg, prev_msg):
        # You should receive an ACK status message for the get_contract
        # call then another ACK message for the limit order submitted. You
        # might receive other status messages depending on the parameters
        # passed to the commands you issued.
        print(msg.status, msg.message, prev_msg)

    def handle_batch_message(self, msg, prev_msg):
        pass

    def handle_book_top(self, msg, prev_msg):
        print(msg, prev_msg)

    def handle_book_state_snapshot(self, msg, prev_msg):
        # This will receive the MessageBookStateSnapshot that contains all
        # resting limit orders on a contract. prev_msg will be the
        # MessageGetBookState request message.
        print(msg, prev_msg)

    def handle_heartbeat(self, msg, prev_msg):
        # Upon receiving a heartbeat, note the `interval_ms` property.
        # This value is the number of milliseconds after which you should
        # have received the next heartbeat message. If you do not receive
        # another heartbeat after this interval has elapsed, you may assume
        # the exchange went offline.
        print(msg, prev_msg)

class StoreMIDCallback(object):
    """\
    A callback to store the generated Message ID (MID) which is also the
    ID that points to an entry on the exchange's CLOB.
    """
    def __init__(self):
        self._mid = None

    def __call__(self, msg, prev_msg):
        self._mid = msg.mid
        global orders; orders.append(msg)

    @property
    def entry_id(self):
        return self._mid

# You have to upload your public key to your account
client_keypair = KeyPair.generate()

# Acquire server's public key and set it here
server_public_key = b'somepublickey'

# Establish context
context = Context(settings.EXCHANGE_CTE_FQDN, client_keypair.private,
    client_keypair.public, server_public_key, MyMessageProcessor)

context.start()

cmd = get_cmd_version('0.0.1').Commands(context)

# Acquire a list of contracts on the exchange
cmd.get_contract(all_contracts=True)
time.sleep(3) # allow enough time to build the contracts list

if not len(contracts):
    print("loading contracts failed")
    # Timeout after 1 second in case 0MQ lingering option was set and there
    # are messages waiting to be processed. We don't want those in case we
    # need to exit.
    context.join(1)
    raise SystemExit

# Pick a contract at random
contract = contracts[randint(0, len(contracts) - 1)]

# Submit a limit order
sm_cb = StoreMIDCallback()
item = cmd.limit_order(size=1, price_in_cents=5000,
    contract_id=contract.contract_id)
item.callback = sm_cb
time.sleep(3) # allow enough time to get a reply with the Entry ID

print("Order's ID: {}".format(sm_cb.entry_id))

# Timeout after 3 seconds just in case there are lingering messages that
# need to be consumed from 0MQ internal queues.
context.join(3) # Timeout after 3 seconds