XROAD
Разработка системы на C

C SDK

Для разработки нод силами клиента, используется SDK

Рис. 1
  • bin - здесь лежит скрипт, который компилирует правила роутинга config/dll.c в libdll.so (см. Конфигурация системы)
  • include/cache - реализация работы со встроенным хранилищем данных
  • include/common - реализация работы с контейнерами, строками, xml и т.д. Требует линковку с libcommon.so
  • include/db_engine - библиотека, реализующая работу с PostgreSQL сервером
  • include/fix - библиотека, реализующая работу с FIX протоколом
  • include/http - библиотека, упрощающая работу c HTTP протоколом (через libcurl)
  • include/instrdb - библиотека, упрощающая работу со справочником инструментов. Требует линковку с libinstrdb.so
  • include/kafka - библиотека для работы с Kafka (через librdkafka)
  • include/logger - реализация бинарного логгера. Требует линковку с liblogger.so или liblogger_stdout.so
  • include/mdata - реализация работы с рыночными данными. Требует линковку с libmdata.so
  • include/net - реализация работы с сетью. Требует линковку с libnet.so
  • include/node - основная библиотека, реализующая функционал ноды. Требует линковку с libnode.so
  • include/order - реализация работы с ордерами. Требует линковку с liborder.so
  • include/props - работа со свойствами ноды (пока не используется)
  • include/rabbit - реализует протокол взаимодействия с RabbitMQ брокером
  • include/strategy - реализация враппера, который скрывает детали инициализации и подписки на рыночные данные, а
  • include/ui - определяет интерфейс для работы с WebUI
  • include/websocket - враппер для работы с websockets так же мпозволяет создавать ордера используя конфигурационный файл
  • lib - библиотеки для линковки
  • python/xroad - библиотеки для создания нод на языке Python

Hello, world!

Начнем разработку робота c coздания минимальной заглушки, которая будет просто компилироваться. Для этого создадим robot.c и запишем в него следующее:

1 #include <common/xroad_process.h>
2 #include <stdint.h>
3 
4 int32_t main(int argc, char *argv[])
5 {
6  xroad_process_iface_t piface =
7  {
8  };
9  return xroad_process(argc, argv, piface);
10 }
Definition: xroad_process.h:25
int32_t xroad_process(int32_t argc, char *argv[], xroad_process_iface_t piface)

Сохраним и попробуем скомпилировать из командной строки:

gcc -O3 -g robot.c -Isdk/include -I/usr/include/libxml2 -Lsdk/lib -lcommon -lnode -o robot

если компиляция прошла успешно, то должен создаться исполняемый файл robot. Можно проверить с какой версией SDK он был собран:

bin/robot -v
61.0.be2a2a3 debug uncommited X86-64

видно, что он собран с версией SDK 61.0

Добавление callback функций

Данная версия, кроме того, чтобы печатать версию, больше ничего не умеет и не будет работать в системе, так как она не имеет обязательных обработчиков событий. Поэтому добавим их:

1 #include <common/xroad_process.h>
2 #include <node/xroad_node.h>
3 #include <logger/xroad_logger.h>
4 #include <stdint.h>
5 #include <stdbool.h>
6 
7 struct process_s
8 {
9  bool exit;
10 };
11 
12 process_t* proc_create(int32_t argc, char* argv[])
13 {
14  process_t* proc = calloc(1, sizeof(process_t));
15  return proc;
16 }
17 
18 void proc_destroy(process_t* proc)
19 {
20  free(proc);
21 }
22 
23 void proc_start(process_t* proc)
24 {
26 }
27 
28 void proc_stop(process_t* proc)
29 {
31 }
32 
33 void proc_shutdown(process_t* proc)
34 {
35  proc_stop(proc);
36  proc->exit = true;
37 }
38 
39 void proc_on_node_object(process_t* proc, void* obj, xroad_node_id_t from)
40 {
41  xroad_log_debug("%O received from %u node", obj, from);
42 }
43 
44 int32_t proc_enter_loop(process_t* proc)
45 {
47  return !proc->exit;
48 }
49 
50 int32_t main(int argc, char *argv[])
51 {
52  xroad_process_iface_t piface =
53  {
54  .create = proc_create,
55  .destroy = proc_destroy,
56  .start = proc_start,
57  .stop = proc_stop,
58  .shutdown = proc_shutdown,
59  .on_node_object = proc_on_node_object,
60  .enter_loop = proc_enter_loop
61  };
62  return xroad_process(argc, argv, piface);
63 }
xroad_node_status_t status
node status
Definition: xroad_system.h:138
process_t *(* create)(int32_t, char *[])
Definition: xroad_process.h:32
#define xroad_log_debug(fmt,...)
Definition: xroad_logger.h:196
int32_t xroad_node_receive(void)
xroad_node_data_t * xroad_node_get_data(void)
@ xroad_node_status_active
node is active and works
Definition: xroad_system.h:29
@ xroad_node_status_offline
node started, but is not active
Definition: xroad_system.h:30

Прежде всего добавим необходимые заголовки (2,3,5). Т.к. мы уже начнем использовать логгер, то нам понадобится xroad_logger.h (3).

  • 7-10 - необходимо создать структуру process_s, которая будет хранить состояние процесса. Важно, чтобы имя структуры было именно process_s. Видно, что структура имеет только одно поле-флаг exit, которое определяет работает процесс или можно завершить работу
    Note
    Язык С не имеет встроенного типа bool, поэтому для того чтобы его использовать необходимо включить заговок <stdbool.h>
  • 12-16 - вызывается для создания и инициализации структуры process_s. Указатель не структуру должен вернуться вызывающей стороне. Если по каким-то причинам инициализация закончилась неудачей, то необходимо вернуть NULL, процесс при этом завершит работу
  • 18-21 - финальный метод, который вызывается для освобождения ресурсов, аллоцированных, например, в process_s. Не вызывается, если proc_create завершился неудачей.
  • 23-26 - если нода является не stand alone, супернода init будет посылать start сообщение, если нода находится в рамках рабочего интервала. Если нода stand alone, то proc_start будет вызван автоматически при старте ноды вручную
  • 28-31 - если нода является не stand alone, супернода init будет посылать stop сообщение, если нода НЕ находится в рамках рабочего интервала. Если нода stand alone, то proc_stop, не вызывается
  • 33-37 - вызывается при получении сигналов SIGTERM, SIGINT. Вызывает остановку ноды. Здесь логично вызвать proc_stop и выставить флаг exit в 1, что сигнализирует о том, что нода готова к завершению
  • 39-42 - основной обработчик всех сообщений, которые могут быть присланы от других нод
  • 44-48 - вызывается внутри основного цикла процесса. Здесь важно вызвать xroad_node_receive с каким-нибудь временным интервалом. Здесь важно вернуть значение флага exit, чтобы просигнализировать, что процесс готов завершить свою работу. Если возвращается значение 0, это означает, что процесс готов завершить свою работу, значение >0 - процесс продолжает свою работу, < 0 - процесс завершает свою работу с ошибкой
  • 52-61 - здесь конфигурируется callback интерфейс xroad_process. xroad_process имеет также ряд необязательных callback, которые используются по необходимости:
    • activate - обработчик сообщения activate
    • deactivate - обработчик сообщения deactivate
    • reconfig - обработчик сообщения reconfig
    • date_changed - обработчик сообщения date_changed
    • reset - обработчик сообщения reset
    • exited - обработчик сообщения о том что какая-то нода системы завершила свою работу
      Note
      детальное описание всех обработчиков можно найти в sdk/include/common/xroad_process.h

Работа с конфигурацией

Из примера выше видно, что в строке 46 используется константа для обозначения таймаута ожидания 1000000. А что, если мы хотим получать это значения из конфигурации? Тем более что у нас есть robot.xml(xsd). Для этого расширим наш пример и добавим использование конфигурации и добавим обработчик reconfig:

1 #include <common/xroad_process.h>
2 #include <common/xroad_xml.h>
3 #include <node/xroad_node.h>
4 #include <logger/xroad_logger.h>
5 #include <stdint.h>
6 #include <stdbool.h>
7 
8 struct process_s
9 {
10  bool exit;
11 };
12 
13 xroad_errno_t configure(process_t* proc, xroad_xml_tag_t cfg)
14 {
15  return XROAD_OK;
16 }
17 
18 process_t* proc_create(int32_t argc, char* argv[])
19 {
20  process_t* proc = calloc(1, sizeof(process_t));
21  xroad_xml_doc_t* doc = xroad_node_get_config();
22  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
23  if (configure(proc, cfg) != XROAD_OK)
24  {
25  return NULL;
26  }
27  return proc;
28 }
29 
30 void proc_destroy(process_t* proc)
31 {
32  free(proc);
33 }
34 
35 void proc_start(process_t* proc)
36 {
38 }
39 
40 void proc_stop(process_t* proc)
41 {
43 }
44 
45 void proc_shutdown(process_t* proc)
46 {
47  proc_stop(proc);
48  proc->exit = true;
49 }
50 
51 void proc_on_node_object(process_t* proc, void* obj, xroad_node_id_t from)
52 {
53  xroad_log_debug("%O received from %u node", obj, from);
54 }
55 
56 xroad_errno_t proc_reconfig(process_t* proc)
57 {
58  xroad_xml_doc_t* doc = xroad_node_get_config();
59  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
60  return configure(proc, cfg);
61 }
62 
63 int32_t proc_enter_loop(process_t* proc)
64 {
66  return !proc->exit;
67 }
68 
69 int32_t main(int argc, char *argv[])
70 {
71  xroad_process_iface_t piface =
72  {
73  .create = proc_create,
74  .destroy = proc_destroy,
75  .start = proc_start,
76  .stop = proc_stop,
77  .shutdown = proc_shutdown,
78  .on_node_object = proc_on_node_object,
79  .enter_loop = proc_enter_loop,
80  .reconfig = proc_reconfig
81  };
82  return xroad_process(argc, argv, piface);
83 }
xroad_xml_doc_t * xroad_node_get_config(void)
xroad_xml_tag_t xroad_xml_get_root(xroad_xml_doc_t *doc)
  • 2 - добавляем заголовок, отвечающий за работу с xml конфигурацией
  • 13-16 - функция configure, отвечающая за чтение xml конфигурации:
    • 21-23 - при старте, нода читает свою конфигурацию (в нашем случае это robot.xml), обогащает ее атрибутами со значениями по умолчанию из node.xsd и robot.xsd и использует ее для стартовой инициализации. Далее эта конфигурация доступна в любом месте вызова xroad_node_get_cfg
    • т.к. для чтения конфигурации используется libxml2, то конфигурация представляет собой дерево объектов document->xml_tag->child_tag->attribute. Для чтения значений атрибутов используются методы:
      • xroad_xml_get_attr_i - для чтения целочисленного атрибута
      • xroad_xml_get_attr_d - для чтения атрибута с плавающей точкой
      • xroad_xml_get_attr_s - для чтения строчного атрибута
      • xroad_xml_get_attr_b - для чтения атрибута типа xs:boolean
        Note
        более подробно о том как работать с конфигурацией в sdk/include/common/xroad_xml.h
  • 56-60 - этот callback вызывается, когда ноде приходит сообщение reconfigure. К моменту вызова callback'a нода уже перечитала измененную конфигурацию, и она так же доступна по вызову xroad_node_get_cfg
  • 79 - настраиваем интерфейс процесса, добавив proc_reconfig в интерсейс вызовов

Работа с рыночными данными

Для работы с рыночными данными используется заголовок sdk/include/mdata/engine/mdata_engine.h и библиотека sdk/lib/libmdata.so. Расширим код нашего робота, чтобы он умел брать название инструмента из файла конфигурации, находить его в таблице инструментов и подписываться на рыночные данные по этому инструменту. Для начала расширим конфигурацию робота. Добавим тэг <app> в котором будем задавать название инструмента, на данные которого надо подписаться и добавим секцию, которая подключит настройки mdata_engine. Сначала изменим robot.xml:

<?xml version="1.0"?>
<config>
<node
log_level="debug"
/>
<mdata_engine/>
<app mdata_instr="SBER.TQBR"/>
</config>
  • 6 - во-первых мы добавили секцию <mdata_engine>, которая настраивает нашу библиотеку mdata_engine. Полное описание секции доступно в config/mdata_engine.xsd
  • 7 - во-вторых добавилась секция <app> в которой атрибутом mdata_instr задается инструмент, на данные которого надо подписаться. В нашем случае это SBER.TQBR

Теперь необходимо изменить robot.xsd таким образом, чтобы изменения в robot.xml успешно валидировались:

<?xml version="1.0"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:include schemaLocation="node.xsd"/>
<xs:include schemaLocation="mdata_engine.xsd"/>
<xs:element name="config">
<xs:complexType>
<xs:sequence>
<xs:element name="node" type="node_type" />
<xs:element name="mdata_engine" type="mdata_engine_type" />
<xs:element name="app" minOccurs="0">
<xs:complexType>
<xs:attribute name="mdata_instr" type="non_empty_string_type" use="required"/>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>

:

  • 4 - подключили схему конфигурации mdata_engine
  • 10 - указали в каком месте xml файла должна располагаться секция mdata_engine
  • 11-15 - описание тэга <app> с одним строковым атрибутом mdata_instr \После того как изменения в xml/xsd файлах выполнены их необходимо проверить при помощи скрипта check_cfg.sh

Теперь внесем изменения в код нашего робота:

2 #include <common/xroad_process.h>
3 #include <common/xroad_xml.h>
4 #include <node/xroad_node.h>
5 #include <logger/xroad_logger.h>
6 #include <instrdb/instrdb.h>
8 #include <stdint.h>
9 #include <stdbool.h>
10 
11 struct process_s
12 {
13  bool exit;
14  int32_t wait_timeout;
15  instrdb_t* idb;
16  mdata_engine_t* mdata;
17 };
18 
19 void on_mdata_subscribe_result(mdata_subscribe_result_t* res, void* ctx)
20 {
21  process_t* proc = (process_t*)ctx;
22  xroad_instr_t* i = instrdb_get_by_id(proc->idb, res->instr_id);
23  if (i)
24  {
25  xroad_log_info("on_subscribe_result: instr = %S, mask = %d, result = %d",
26  xroad_instr_get_alias(i), res->mask, res->error_num);
27  }
28 }
29 
30 void on_mdata_feed_state(mdata_feed_state_type_t state, void* ctx)
31 {
32  xroad_log_info("feed state is %d", state);
33 }
34 
35 void on_mdata(mdata_proto_type_t t, void* mdata, void* ctx)
36 {
37  process_t* proc = (process_t*)ctx;
38  switch(t)
39  {
40  case mdata_proto_type_quote:
41  {
42  mdata_quote_t* quote = (mdata_quote_t*)mdata;
43  xroad_instr_t* i = instrdb_get_by_id(proc->idb, quote->instr_id);
44  if (i)
45  {
46  xroad_str_t alias = xroad_instr_get_alias(i);
47  xroad_log_info("quote received: name = %P, bid = %d@%f, ask = %d@%f",
48  alias, quote->bid.price, quote->bid.qty, quote->ask.price, quote->ask.qty);
49  }
50  }
51  case mdata_proto_type_trade:
52  {
53  mdata_trade_t* trade = (mdata_trade_t*)mdata;
54  xroad_instr_t* i = instrdb_get_by_id(proc->idb, trade->instr_id);
55  if (i)
56  {
57  xroad_str_t alias = xroad_instr_get_alias(i);
58  xroad_log_info("trade received: name = %P, %d@%f", alias, trade->qty, trade->price);
59  }
60  }
61  case mdata_proto_type_book:
62  {
63  mdata_book_20_t* book = (mdata_book_20_t*)mdata;
64  xroad_instr_t* i = instrdb_get_by_id(proc->idb, book->instr_id);
65  if (i)
66  {
67  xroad_str_t alias = xroad_instr_get_alias(i);
68  xroad_log_info("best ask %P %D@%f", alias, book->asks[0].qty, book->asks[0].price);
69  xroad_log_info("best bid %P %D@%f", alias, book->bids[0].qty, book->bids[0].price);
70  }
71  break;
72  }
73  default:
74  {
75  }
76  }
77 }
78 
79 void on_mdata_connected(void* ctx)
80 {
81  process_t* proc = (process_t*)ctx;
82  xroad_xml_doc_t* doc = xroad_node_get_config();
83  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
84  xroad_xml_tag_t tag = xroad_xml_get_tag(cfg, xroad_str("app"));
85  xroad_str_t mdata_instr = xroad_xml_get_attr_s(tag, xroad_str("mdata_instr"));
86  xroad_instr_t* i = instrdb_get_by_alias(proc->idb, mdata_instr);
87  if (!i)
88  {
89  xroad_log_error("instrument not found by alias %P", mdata_instr);
90  return;
91  }
92  mdata_callback_t channel_callback =
93  {
94  .ctx = proc,
95  .on_mdata = on_mdata
96  };
97  mdata_engine_subscribe(proc->mdata, i,
98  mdata_subscription_trade | mdata_subscription_quote | mdata_subscription_updates, channel_callback);
99 }
100 
101 void on_mdata_disconnected(void* ctx)
102 {
103  xroad_log_info("mdata disconnected");
104 }
105 
106 xroad_errno_t configure(process_t* proc, xroad_xml_tag_t cfg)
107 {
108  return XROAD_OK;
109 }
110 
111 process_t* proc_create(int32_t argc, char* argv[])
112 {
113  process_t* proc = calloc(1, sizeof(process_t));
114  xroad_xml_doc_t* doc = xroad_node_get_config();
115  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
116  proc->idb = instrdb_create(xroad_xml_get_tag(cfg, xroad_str("node")));
117  if (!proc->idb)
118  {
119  return NULL;
120  }
121  mdata_engine_callback_t mdata_callback =
122  {
123  .ctx = proc,
124  .on_subscribe_result = on_mdata_subscribe_result,
125  .on_feed_state = on_mdata_feed_state,
126  .on_connected = on_mdata_connected,
127  .on_disconnected = on_mdata_disconnected,
128  };
129  proc->mdata = mdata_engine_create(xroad_xml_get_tag(cfg, xroad_str("mdata_engine")), mdata_callback);
130  if (!proc->mdata)
131  {
132  return NULL;
133  }
134  if (configure(proc, cfg) != XROAD_OK)
135  {
136  return NULL;
137  }
138  return proc;
139 }
140 
141 void proc_destroy(process_t* proc)
142 {
143  free(proc);
144 }
145 
146 void proc_start(process_t* proc)
147 {
148  mdata_engine_start(proc->mdata);
150 }
151 
152 void proc_stop(process_t* proc)
153 {
154  mdata_engine_stop(proc->mdata);
156 }
157 
158 void proc_shutdown(process_t* proc)
159 {
160  proc_stop(proc);
161  proc->exit = true;
162 }
163 
164 void proc_on_node_object(process_t* proc, void* obj, xroad_node_id_t from)
165 {
166  xroad_log_debug("%O received from %u node", obj, from);
167 }
168 
169 xroad_errno_t proc_reconfig(process_t* proc)
170 {
171  xroad_xml_doc_t* doc = xroad_node_get_config();
172  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
173  return configure(proc, cfg);
174 }
175 
176 int32_t proc_enter_loop(process_t* proc)
177 {
179  return !proc->exit;
180 }
181 
182 int32_t main(int argc, char *argv[])
183 {
184  xroad_process_iface_t piface =
185  {
186  .create = proc_create,
187  .destroy = proc_destroy,
188  .start = proc_start,
189  .stop = proc_stop,
190  .shutdown = proc_shutdown,
191  .on_node_object = proc_on_node_object,
192  .enter_loop = proc_enter_loop,
193  .reconfig = proc_reconfig
194  };
195  return xroad_process(argc, argv, piface);
196 }
mdata_engine_t * mdata_engine_create(xroad_xml_tag_t cfg, mdata_engine_callback_t ccb)
xroad_errno_t mdata_engine_start(mdata_engine_t *mdata)
void mdata_engine_stop(mdata_engine_t *mdata)
xroad_errno_t mdata_engine_subscribe(mdata_engine_t *mdata, xroad_instr_t *instr, mdata_subscription_mask_t mask, mdata_callback_t cb, xroad_str_t mdsource)
Definition: mdata_proto.h:107
Definition: mdata_engine.h:46
Definition: mdata_engine.h:18
Definition: mdata_proto.h:118
Definition: mdata_proto.h:97
Definition: mdata_proto.h:130
Definition: xroad_string.h:29
#define xroad_log_error(fmt,...)
Definition: xroad_logger.h:103
#define xroad_log_info(fmt,...)
Definition: xroad_logger.h:172
#define xroad_str(str)
Definition: xroad_string.h:165
xroad_xml_tag_t xroad_xml_get_tag(xroad_xml_tag_t tag, xroad_str_t path)
xroad_str_t xroad_xml_get_attr_s(xroad_xml_tag_t tag, xroad_str_t attribute)
  • 5-6 - прежде всего добавляем необходимые заголовки instrdb.h и mdate_engine.h для работы с инструментами и рыночными данными соответственно
  • 14-15 - переменные указатели на экземпляры библиотек
  • 115-132 - здесь происходит инициализация библиотек. Как видно mdata_engine содержит 5 callback функций:
    • on_subscribe_result - возвращает статус подписки на инструмент
    • on_feed_state - возвращает текущий стату потока данных
    • on_connected - вызывается, когда библиотека присоединилась к потоку данных
    • on_disconnected - вызывается, когда библиотека отсоединилась от потока данных
  • 147 - mdata_engine необходимо запустить при старте ноды. Иногда удобнее запускать mdata_engine при активации ноды(получения сообщения activate)
  • 153 - mdata_engine необходимо остановить при остановке ноды. Иногда удобнее останавливать mdata_engine при деактивации ноды(получения сообщения deactivate)
  • 18-27 - позволяет получить статус подписки на конкретный инструмент
  • 29-32 - позволяет получить статус потока данных и как-то отреагировать, если, например, произошло отключение данных
  • 34-76 - в этом callback'e происходит обработка рыночных данных. Как видно обрабатываются 3 вида данных: quote, trade, book. Более подробно структуры рыночных данных описаны в sdk/include/mdata/engine/mdata_proto_types.h
  • 78-98 - вызывается в момент, когда mdata_engine подключилась к потоку данных:
  • 84-90 - название инструмента, на рыночные данные которого надо подписаться извлекается из файла конфигурации. Как мы помним, он задается атрибутом mdata_instr в тэге <app>. Полученное название инструмента проверяется по таблице инструментов
  • 91-97 - если такой инструмент существует, то делаем подписку на quote, trade
  • 100-103 - вызывается, когда mdata_engine отключается от потока данных

Собираем:

  gcc -O3 -g robot3.c -I$XROAD_ROOT_DIR/sdk/include -I/usr/include/libxml2 -L$XROAD_ROOT_DIR/sdk/lib -lcommon -lnode -lmdata -linstrdb -o robot

Работа с ордерами

Теперь, когда у нас есть рыночные данные мы будем выставлять заявку на покупку по цене лучшей покупки. И как только заявка активируется снимать ее. Для начала изменим конфигурацию робота - мы будем получать значения счета, кода клиента и название торгуемого инструмента из конфигурации.

robot.xml:

<?xml version="1.0"?>
<config>
<node
log_level="debug"
/>
<mdata_engine/>
<app mdata_instr="SBER.TQBR" order_instr="SBER.TQBR" account="S01-00000F00" client_code="1" />
</config>

robot.xsd:

<?xml version="1.0"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:include schemaLocation="node.xsd"/>
<xs:include schemaLocation="mdata_engine.xsd"/>
<xs:element name="config">
<xs:complexType>
<xs:sequence>
<xs:element name="node" type="node_type" />
<xs:element name="mdata_engine" type="mdata_engine_type" />
<xs:element name="app" minOccurs="0">
<xs:complexType>
<xs:attribute name="mdata_instr" type="non_empty_string_type" use="required"/>
<xs:attribute name="order_instr" type="non_empty_string_type" use="required"/>
<xs:attribute name="account" type="non_empty_string_type" use="required"/>
<xs:attribute name="client_code" type="non_empty_string_type" use="required"/>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>

Добавим в robot.c код для работы с ордером:

4 #include <common/xroad_process.h>
5 #include <common/xroad_xml.h>
6 #include <node/xroad_node.h>
7 #include <logger/xroad_logger.h>
8 #include <instrdb/instrdb.h>
10 #include <order/order.h>
11 #include <order/order_aux.h>
12 #include <stdint.h>
13 #include <stdbool.h>
14 
15 struct process_s
16 {
17  bool exit;
18  instrdb_t* idb;
19  mdata_engine_t* mdata;
20  order_t* order;
21 };
22 
23 void on_order_activated(order_t* order, xroad_str_t exch_id, xroad_timestamp_t ts)
24 {
25  xroad_log_debug("order %P has been on_activated", order_get_name(order));
26  order_cancel(order);
27 }
28 
29 void on_order_trade(order_t* order, order_trade_t* trade)
30 {
31  xroad_log_debug("trade received");
32  if (order_get_state(order) == order_state_filled)
33  {
34  xroad_log_info("order has been completelly filled");
35  }
36 }
37 
38 void on_order_canceled(order_t* order, xroad_rej_reason_t reason, xroad_str_t text)
39 {
40  xroad_log_info("order %P has been canceled", order_get_name(order));
41 }
42 
43 void on_order_unexpected_canceled(order_t* order, xroad_rej_reason_t reason, xroad_str_t text)
44 {
45  xroad_log_error("order %P canceled unexpectedly", order_get_name(order));
46 }
47 
48 void on_order_cancel_rejected(order_t* order, xroad_rej_reason_t reason, xroad_str_t text)
49 {
50  xroad_log_error("order %P cancel rejected. reason = %P", order_get_name(order), text);
51 }
52 
53 void on_order_destroyed(order_t* order)
54 {
55  xroad_log_info("order %P destroyed", order_get_name(order));
56 }
57 
58 void on_order_rejected(order_t* order, xroad_rej_reason_t reason, xroad_str_t text)
59 {
60  xroad_log_error("order %P rejected. reason = %P", order_get_name(order), text);
61 }
62 
63 void on_mdata_subscribe_result(mdata_subscribe_result_t* res, void* ctx)
64 {
65  process_t* proc = (process_t*)ctx;
66  xroad_instr_t* i = instrdb_get_by_id(proc->idb, res->instr_id);
67  if (i)
68  {
69  xroad_log_info("on_subscribe_result: instr = %S, mask = %d, result = %d",
70  xroad_instr_get_alias(i), res->mask, res->error_num);
71  }
72 }
73 
74 void on_mdata_feed_state(mdata_feed_state_type_t state, void* ctx)
75 {
76  xroad_log_info("feed state is %d", state);
77 }
78 
79 void on_mdata(mdata_proto_type_t t, void* mdata, void* ctx)
80 {
81  process_t* proc = (process_t*)ctx;
82  switch(t)
83  {
84  case mdata_proto_type_quote:
85  {
86  mdata_quote_t* quote = (mdata_quote_t*)mdata;
87  xroad_instr_t* i = instrdb_get_by_id(proc->idb, quote->instr_id);
88  if (i)
89  {
90  xroad_str_t alias = xroad_instr_get_alias(i);
91  xroad_log_info("quote received: name = %P, bid = %d@%f, ask = %d@%f",
92  alias, quote->bid.price, quote->bid.qty, quote->ask.price, quote->ask.qty);
93  if (order_is_done(proc->order))
94  {
95  order_replace(proc->order, .price = quote->bid.price);
96  order_send(proc->order);
97  }
98  }
99  }
100  case mdata_proto_type_trade:
101  {
102  mdata_trade_t* trade = (mdata_trade_t*)mdata;
103  xroad_instr_t* i = instrdb_get_by_id(proc->idb, trade->instr_id);
104  if (i)
105  {
106  xroad_str_t alias = xroad_instr_get_alias(i);
107  xroad_log_info("trade received: name = %P, %d@%f", alias, trade->qty, trade->price);
108  }
109  }
110  case mdata_proto_type_book:
111  {
112  mdata_book_20_t* book = (mdata_book_20_t*)mdata;
113  xroad_instr_t* i = instrdb_get_by_id(proc->idb, book->instr_id);
114  if (i)
115  {
116  xroad_str_t alias = xroad_instr_get_alias(i);
117  xroad_log_info("best ask %P %D@%f", alias, book->asks[0].qty, book->asks[0].price);
118  xroad_log_info("best bid %P %D@%f", alias, book->bids[0].qty, book->bids[0].price);
119  }
120  break;
121  }
122  default:
123  {
124  }
125  }
126 }
127 
128 void on_mdata_connected(void* ctx)
129 {
130  process_t* proc = (process_t*)ctx;
131  xroad_xml_doc_t* doc = xroad_node_get_config();
132  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
133  xroad_xml_tag_t tag = xroad_xml_get_tag(cfg, xroad_str("app"));
134  xroad_str_t mdata_instr = xroad_xml_get_attr_s(tag, xroad_str("mdata_instr"));
135  xroad_instr_t* i = instrdb_get_by_alias(proc->idb, mdata_instr);
136  if (!i)
137  {
138  xroad_log_error("instrument not found by alias %P", mdata_instr);
139  return;
140  }
141  mdata_callback_t channel_callback =
142  {
143  .ctx = proc,
144  .on_mdata = on_mdata
145  };
146  mdata_engine_subscribe(proc->mdata, i,
147  mdata_subscription_trade | mdata_subscription_quote | mdata_subscription_updates, channel_callback);
148 }
149 
150 void on_mdata_disconnected(void* ctx)
151 {
152  xroad_log_info("mdata disconnected");
153 }
154 
155 xroad_errno_t configure(process_t* proc, xroad_xml_tag_t cfg)
156 {
157  return XROAD_OK;
158 }
159 
160 order_t* create_order(process_t* proc, xroad_xml_tag_t cfg)
161 {
162  order_callback_t ocback =
163  {
164  .on_activated = on_order_activated,
165  .on_trade = on_order_trade,
166  .on_canceled = on_order_canceled,
167  .on_destroyed = on_order_destroyed,
168  .on_rejected = on_order_rejected,
169  .on_cancel_rejected = on_order_cancel_rejected
170  };
171  xroad_str_t account = xroad_xml_get_attr_s(cfg, xroad_str("account"));
172  xroad_str_t client_code = xroad_xml_get_attr_s(cfg, xroad_str("client_code"));
173  xroad_str_t instr_alias = xroad_xml_get_attr_s(cfg, xroad_str("order_instr"));
174  xroad_instr_t* instr = instrdb_get_by_alias(proc->idb, instr_alias);
175  if (!instr)
176  {
177  xroad_log_error("order instrument %P not found", instr_alias);
178  return NULL;
179  }
180  xroad_str_t err = {};
181  order_t* order = order_create(
182  xroad_str("order"),
183  ocback,
184  instr,
185  xroad_side_buy,
186  .account = account,
187  .client_code = client_code,
188  .qty = 1,
189  .ctx = proc,
190  .err = &err);
191  if (!order)
192  {
193  xroad_log_error("unable to create order. reason = %P", err);
194  }
195  return order;
196 }
197 
198 process_t* proc_create(int32_t argc, char* argv[])
199 {
200  process_t* proc = calloc(1, sizeof(process_t));
201  xroad_xml_doc_t* doc = xroad_node_get_config();
202  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
203  proc->idb = instrdb_create(xroad_xml_get_tag(cfg, xroad_str("node")));
204  if (!proc->idb)
205  {
206  return NULL;
207  }
208  mdata_engine_callback_t mdata_callback =
209  {
210  .ctx = proc,
211  .on_subscribe_result = on_mdata_subscribe_result,
212  .on_feed_state = on_mdata_feed_state,
213  .on_connected = on_mdata_connected,
214  .on_disconnected = on_mdata_disconnected,
215  };
216  proc->mdata = mdata_engine_create(xroad_xml_get_tag(cfg, xroad_str("mdata_engine")), mdata_callback);
217  if (!proc->mdata)
218  {
219  return NULL;
220  }
221  proc->order = create_order(proc, xroad_xml_get_tag(cfg, xroad_str("app")));
222  if (!proc->order)
223  {
224  return NULL;
225  }
226  if (configure(proc, cfg) != XROAD_OK)
227  {
228  return NULL;
229  }
230  return proc;
231 }
232 
233 void proc_destroy(process_t* proc)
234 {
235  free(proc);
236 }
237 
238 void proc_start(process_t* proc)
239 {
240  mdata_engine_start(proc->mdata);
242 }
243 
244 void proc_stop(process_t* proc)
245 {
246  mdata_engine_stop(proc->mdata);
248 }
249 
250 void proc_shutdown(process_t* proc)
251 {
252  proc_stop(proc);
253  proc->exit = true;
254 }
255 
256 void proc_on_node_object(process_t* proc, void* obj, xroad_node_id_t from)
257 {
258  xroad_log_debug("%O received from %u node", obj, from);
259 }
260 
261 xroad_errno_t proc_reconfig(process_t* proc)
262 {
263  xroad_xml_doc_t* doc = xroad_node_get_config();
264  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
265  return configure(proc, cfg);
266 }
267 
268 int32_t proc_enter_loop(process_t* proc)
269 {
271  return !proc->exit;
272 }
273 
274 int32_t main(int argc, char *argv[])
275 {
276  xroad_process_iface_t piface =
277  {
278  .create = proc_create,
279  .destroy = proc_destroy,
280  .start = proc_start,
281  .stop = proc_stop,
282  .shutdown = proc_shutdown,
283  .on_node_object = proc_on_node_object,
284  .enter_loop = proc_enter_loop,
285  .reconfig = proc_reconfig
286  };
287  return xroad_process(argc, argv, piface);
288 }
@ order_state_filled
order is in filled state
Definition: order.h:29
xroad_str_t order_get_name(const order_t *o)
xroad_errno_t order_send(order_t *o)
order_state_t order_get_state(const order_t *o)
Definition: order.h:102
void(* on_activated)(order_t *, xroad_str_t, xroad_timestamp_t)
Definition: order.h:106
CAUTION: do not change this file - it is autogenerated.
  • 10-11 - прежде всего добавим необходимые заголовки для работы с ордерами
  • 20 - добавим указатель на экземпляр ордера
  • 160-196 - добавит метод create_order, который будет создавать ордер:
    • 162 - определяем интерфейс ордера: добавляем callback'и на те события, которые мы хотим обрабатывать
    • 171-173 - читаем значения атрибутов account, client_code, order_instr из конфигурации
    • 174-179 - пытаемся найти инструмент в таблице инструментов, если не найден, то ордер не может быть созданным
    • 181-190 - непосредственно создание самого объекта ордера:
      • 182 - имя ордера, используется для логирования. Должно быть уникальным среди всех ордеров
      • 188 - размер ордера в лотах или в штуках (единицы задаются флагами ордера)
      • 189 - передаем в ордер указатель на process в качестве контескта
      • 191-195 - если по каким-то причинам ордер не был создан, то order_create вернет NULL, а err будет содержать причину ошибки
  • 221 - вызываем метод create_order непосредственно в процессе создания процесса. Вовсе не обязательно делать это именно при создании процесса
  • 91-96 - на каждую квоту по инструменту мы будем пытаться выставить ордер, но только при условии, что ордер либо исполнился, либо снят, либо не посылался на биржу. Для этого мы проверяем что ордер находится в подходящем состоянии order_is_done.
  • 23-61 - для работы с событиями ордера определены callback функции:
    • 23-27 - on_order_activated, вызывается, когда ордер становится активным на бирже. Как только мы получаем событие, что ордер активирован, он снимается (26)
  • 29-36 - on_order_trade, вызывается, когда ордер исполняется полностью или частично
  • 38-41 - on_order_canceled, вызывается, когда заявка была снята на бирже
  • 43-46 - on_order_unexpected_canceled, вызывается, когда заявка снята на бирже, но это событие не было инициировано вызовом order_cancel
  • 48-51 - on_order_cancel_rejected, вызывается, когда запрос на снятие ордера по каким-то причинам был отклонен
  • 53-56 - on_order_destroyed, вызывается в ответ на вызов order_destroy. При этом, если ордер был активен на бирже он сначала будет снят
  • 58-61 - on_order_rejected, вызывается если ордер был отклонен биржей

Собираем:

  gcc -O3 -g robot4.c -I$XROAD_ROOT_DIR/sdk/include -I/usr/include/libxml2 -L$XROAD_ROOT_DIR/sdk/lib -lcommon -lnode -lmdata -linstrdb -lorder -o robot

Использование бинарного логгера

В системе используется бинарный логгер, который записывает логи как есть не используя форматирование. Для того чтобы прочитать лог файл используется bin/xroad_log_reader утилита. Для того чтобы начать использовать логгер в коде, достаточно подключить заголовок

и слинковаться с

sdk/lib/liblogger.so

Наиболее часто для логирования используются методы:

  • xroad_log_info - для логирования информации
  • xroad_log_debug - для логирования отладочной информации
  • xroad_log_error - для логирования ошибок
  • xroad_log_warn - для логирования предупреждений
  • xroad_log_trace - для логирования трейс информации

Например:

xroad_log_info("Hello, my name is %P. I am %d years old", xroad_str("Piter"), 21);

Для форматирования текста используются такой же принцип как и для printf, т.е. используется строка формата, которая содержит набор форматтеров (d, f, s и т.д.) и список аргументов. xroad_logger отличается от printf только тем что использует свой набор форматтеров:

  • %d - аргумент типа int32_t
  • %D - аргумент типа int64_t
  • %u - аргумент типа uint32_t
  • %U - аргумент типа uint64_t
  • %c - аргумент типа char
  • %s(zs) - aргумент типа С-string
  • %S(zS) - аргумент типа fixed length string, которая создается макросом xroad_str_decl
  • %P(zP) - аргумент типа xroad_str_t
  • %f - аргумент типа double
  • %X - аргумент типа byte array. В качестве аргумента передается пара значений длинна массива и указатель на массив
  • %O - аргумент типа объект из xroad_objects.h
  • %R - аргумент типа ссылка на объект xroad_object_ref_t
  • %i - аргумент типа xroad_time_t
  • %a - аргумент типа xroad_date_t
  • %g - аргумент типа xroad_datetime_t

Использование нескольких логгеров

Логер, который используется методами xroad_log_* и xroad_vlog_* называется main(главный) логгер. Или логгер по умолчанию. Помимо главного логгера, имеется возможность создавать другие логгеры, которые будут иметь индивидуальные настройки.

Note
Подробнее о настройках можно прочитать в главе Конфигурация логгеров Для создания нового логгера используется метод xroa_logger_get, которому в качестве парметра передается имя нового логгера. Если логгер с таким именем уже создан, то он просто возвращается. Если логгер с таким именем не сконфигурирован, то новый логгер наследует настройки главного логгера. Для логирования информации с использованием созданного логгера используется семейство xroad_logx_* и xroad_vlogx_* Например:
xroad_logger_t* logger = xroad_logger_get(xroad_str("marker_data"));
xroad_logx_info(logger, "new trade received %d@%f", qty, price);
xroad_logx_error(NULL, "panic!!!!");
#define xroad_logx_info(logger, fmt,...)
Definition: xroad_logger.h:162
#define xroad_logx_error(logger, fmt,...)
Definition: xroad_logger.h:93

Если вызвать метод, как это показано в строке 3 (c нулевым указателем), то будет вызван главный логгер.

Пример того, как будет выглядеть залогированная информация, прочитанная при помощи xroad_log_reader'a:

23:30:58.006509 [I] [market_data] - new trade received 100@123.12
23:30:58.006510 [E] [main] - panic!!!

Упрощаем работу с рыночными данными и ордерами

Часто, для создания стратегий все эти манипуляции (инициализация библиотеки работы с рыночными данными, с ордерами) могут показаться утомительными. Для целей упрощения работы можно использовать strategy библиотеку. Эта библиотека скрывает все детали инициализации и подписки на нужные рыночные данные, а так же создает все необходимые ордера используя файл конфигурации:

robot5.xml:

<?xml version="1.0"?>
<config>
<node
log_level="debug"
/>
<strategy>
<mdata>
<mdata_engine/>
<subscriptions>
<subscription alias="Si-6.17" mask="54"/>
</subscriptions>
</mdata>
<orders>
<order name="order_buy"
instr="Si-6.17"
account="h71"
side="buy"/>
</orders>
</strategy>
</config>

Как видно из примера, в строках 8-14 происходит инициализация и подписка на рыночные данные по инструменту Si-6.17, а в строках 15-20 создается ордер order_buy. При этом этот ордер не оправляются на биржу. Для того чтобы отправить его на биржу используется метод strategy_send_order или strategy_send_order_ex. Для подписки на нужные рыночные данные используется маска, которая преставляет собой побитовое "И" из перечисления mdata_proto_type_t.

robot5.c

1 #include <common/xroad_process.h>
2 #include <common/xroad_xml.h>
3 #include <node/xroad_node.h>
4 #include <logger/xroad_logger.h>
5 #include <strategy/strategy.h>
6 #include <order/order_aux.h>
7 #include <stdint.h>
8 #include <stdbool.h>
9 
10 struct process_s
11 {
12  bool exit;
13  strategy_t* strategy;
14 };
15 
16 void on_order_activated(strategy_t* s, order_t* order, void* ctx)
17 {
18  xroad_log_debug("order %P has been on_activated", order_get_name(order));
19  order_cancel(order);
20 }
21 
22 void on_order_trade(strategy_t* s, order_t* order, order_trade_t* trade, void* ctx)
23 {
24  xroad_log_debug("trade received");
25  if (order_get_state(order) == order_state_filled)
26  {
27  xroad_log_info("order has been completelly filled");
28  }
29 }
30 
31 void on_order_canceled(strategy_t* s, order_t* order, xroad_rej_reason_t reason, xroad_str_t text, void* ctx)
32 {
33  xroad_log_info("order %P has been canceled", order_get_name(order));
34 }
35 
36 void on_order_cancel_rejected(strategy_t* s, order_t* order, xroad_rej_reason_t reason, xroad_str_t text, void* ctx)
37 {
38  xroad_log_error("order %P cancel rejected. reason = %P", order_get_name(order), text);
39 }
40 
41 void on_order_rejected(strategy_t* s, order_t* order, xroad_rej_reason_t reason, xroad_str_t text, void* ctx)
42 {
43  xroad_log_error("order %P rejected. reason = %P", order_get_name(order), text);
44 }
45 
46 void on_mdata_trade(strategy_t* s, xroad_instr_t* i, const mdata_trade_t* trade, void* ctx)
47 {
48  xroad_str_t alias = xroad_instr_get_alias(i);
49  xroad_log_info("trade received: name = %P, %d@%f", alias, trade->qty, trade->price);
50 }
51 
52 void on_mdata_quote(strategy_t* s, xroad_instr_t* i, const mdata_quote_t* quote, void* ctx)
53 {
54  xroad_str_t alias = xroad_instr_get_alias(i);
55  xroad_log_info("quote received: name = %P, bid = %d@%f, ask = %d@%f",
56  alias, quote->bid.price, quote->bid.qty, quote->ask.price, quote->ask.qty);
57  strategy_send_order(s, xroad_str("order_buy"), .price = quote->bid.price, .qty = 10);
58 }
59 
60 void on_mdata_book(strategy_t* s, xroad_instr_t* i, const mdata_book_20_t* book, void* ctx)
61 {
62  xroad_str_t alias = xroad_instr_get_alias(i);
63  xroad_log_info("best ask %P %D@%f", alias, book->asks[0].qty, book->asks[0].price);
64  xroad_log_info("best bid %P %D@%f", alias, book->bids[0].qty, book->bids[0].price);
65 }
66 
67 xroad_errno_t configure(process_t* proc, xroad_xml_tag_t cfg)
68 {
69  return XROAD_OK;
70 }
71 
72 process_t* proc_create(int32_t argc, char* argv[])
73 {
74  process_t* proc = calloc(1, sizeof(process_t));
75  xroad_xml_doc_t* doc = xroad_node_get_config();
76  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
77  strategy_callback_t callback =
78  {
79  .ctx = proc,
80  .on_mdata_trade = on_mdata_trade,
81  .on_mdata_quote = on_mdata_quote,
82  .on_mdata_book = on_mdata_book,
83  .on_order_activated = on_order_activated,
84  .on_order_trade = on_order_trade,
85  .on_order_canceled = on_order_canceled,
86  .on_order_rejected = on_order_rejected,
87  .on_order_cancel_rejected = on_order_cancel_rejected
88  };
89  proc->strategy = strategy_create(xroad_xml_get_tag(cfg, xroad_str("strategy")), callback);
90  if (!proc->strategy)
91  {
92  return NULL;
93  }
94  if (configure(proc, cfg) != XROAD_OK)
95  {
96  return NULL;
97  }
98  return proc;
99 }
100 
101 void proc_destroy(process_t* proc)
102 {
103  free(proc);
104 }
105 
106 void proc_start(process_t* proc)
107 {
108  strategy_start(proc->strategy);
110 }
111 
112 void proc_stop(process_t* proc)
113 {
114  strategy_stop(proc->strategy);
116 }
117 
118 void proc_shutdown(process_t* proc)
119 {
120  proc_stop(proc);
121  proc->exit = true;
122 }
123 
124 void proc_on_node_object(process_t* proc, void* obj, xroad_node_id_t from)
125 {
126  xroad_log_debug("%O received from %u node", obj, from);
127 }
128 
129 xroad_errno_t proc_reconfig(process_t* proc)
130 {
131  xroad_xml_doc_t* doc = xroad_node_get_config();
132  xroad_xml_tag_t cfg = xroad_xml_get_root(doc);
133  return configure(proc, cfg);
134 }
135 
136 int32_t proc_enter_loop(process_t* proc)
137 {
139  return !proc->exit;
140 }
141 
142 int32_t main(int argc, char *argv[])
143 {
144  xroad_process_iface_t piface =
145  {
146  .create = proc_create,
147  .destroy = proc_destroy,
148  .start = proc_start,
149  .stop = proc_stop,
150  .shutdown = proc_shutdown,
151  .on_node_object = proc_on_node_object,
152  .enter_loop = proc_enter_loop,
153  .reconfig = proc_reconfig
154  };
155  return xroad_process(argc, argv, piface);
156 }
xroad_errno_t strategy_stop(strategy_t *s)
xroad_errno_t strategy_start(strategy_t *s)
strategy_t * strategy_create(xroad_xml_tag_t cfg, strategy_callback_t cback)
Definition: strategy.h:54
void * ctx
Definition: strategy.h:58

Как видно из примера, он стал короче на более чем 100 строчек кода, потому что strategy библиотека взяла на себя всю инициализацию необходимых библиотек.

  • 89-92 - в этих строчках происходит инициализация библиотеки strategy конфигурацией и каллбэками
  • 52-65 - вместо одного каллбэка on_mdata в предыдущем примере используется 3 - отдельных on_mdata_trade, on_mdata_quote, on_mdata_book

вся остальная логика работы осталась без изменений