/* Copyright (c) 2014, MOEX. All rights reserved. Plaza-2 Client Gate API Usage Sample. Process orders log steram. All the software and documentation included in this and any other MOEX CGate Releasese is copyrighted by MOEX. Redistribution and use in source and binary forms, with or without modification, are permitted only by the terms of a valid software license agreement with MOEX. THIS SOFTWARE IS PROVIDED "AS IS" AND MICEX-RTS DISCLAIMS ALL WARRANTIES EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION, ANY IMPLIED WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. MICEX-RTS DOES NOT WARRANT THAT USE OF THE SOFTWARE WILL BE UNINTERRUPTED OR ERROR-FREE. MICEX-RTS SHALL NOT, UNDER ANY CIRCUMSTANCES, BE LIABLE TO LICENSEE FOR LOST PROFITS, CONSEQUENTIAL, INCIDENTAL, SPECIAL OR INDIRECT DAMAGES ARISING OUT OF OR RELATED TO THIS AGREEMENT OR THE TRANSACTIONS CONTEMPLATED HEREUNDER, EVEN IF MICEX-RTS HAS BEEN APPRISED OF THE LIKELIHOOD OF SUCH DAMAGES. */ /* Настоящий пример демонстрирует возможность установки соединения и получения данных по двум независимым соединениям с роутерами. */ #ifndef _WIN32 #include "conio.h" #else #include #include #endif #include #include #include "multi_connector_base.hpp" // флаг выхода из программы // // helper variable used as a notification flag to exit static volatile BOOL bExit = FALSE; #ifdef _WIN32 BOOL WINAPI InterruptHandler(DWORD reason) { log_info("----BREAK----"); bExit = TRUE; return bExit; } #endif//_WIN32 static CONNECTION_CTX connection_ctx [] = { {"", nullptr, nullptr}, {"", nullptr, nullptr} }; static size_t connection_cnt = sizeof(connection_ctx)/sizeof(connection_ctx[0]); // Класс наследник, реализующий желаемую // логику обработки сообщений // // Derived class that implements the desired // message processing class MultiConnectorChild : public MultiConnectorBase { public: MultiConnectorChild (CONNECTION_CTX* conn_ctx, size_t connection_cnt, const std::string& stream) : MultiConnectorBase (conn_ctx, connection_cnt, stream.c_str()) , _connection_cnt(connection_cnt) , _conn_ctx(conn_ctx) { open_stat_files(); } ~MultiConnectorChild() { close_stat_files(); } private: void open_stat_files() const { _ofs_msg_outfile.open( "ofs_msg_stat.txt", std::ofstream::out | std::ofstream::app); if(!_ofs_msg_outfile.is_open()) { log_error("can't create file-%s", "ofs_msg_stat.txt"); } _ofs_fast_router_stat.open( "ofs_rout_stat.txt", std::ofstream::out | std::ofstream::app); if(!_ofs_fast_router_stat.is_open()) { log_error("can't create file-%s", "ofs_rout_stat.txt"); } else { _ofs_fast_router_stat << "created connections..." << std::endl; _ofs_fast_router_stat << "ind-0: " << _conn_ctx[0].conn_str << std::endl; _ofs_fast_router_stat << "ind-1: " << _conn_ctx[1].conn_str << std::endl; } } void close_stat_files() const { if(_ofs_msg_outfile.is_open()) _ofs_msg_outfile.close(); if(_ofs_fast_router_stat.is_open()) _ofs_fast_router_stat.close(); } void print_best_connection_statistics(size_t connection_index) const { static size_t cntr = 0; static size_t max_ind_prev = ~0; static const size_t connections_max = 4; static std::vector message_by_index (connections_max); if(connection_index >= _connection_cnt) { log_error("wrong connection index detected-%d", connection_index); return; } ++cntr; message_by_index[connection_index]++; // каждые 100 сообщений выводить статистику по лидеру (если он сменился) if(cntr%_ROUTER_LOG_FREQ_1 != 0) return; size_t fast_ind = (message_by_index[0] > message_by_index[1])? 0 : 1; // выводить статистику по лидеру // * в первый раз (после 100 сообщений) // * каждые 100 сообщений но только при смене "лидера" // * каждые 1000 сообщений (даже если он не сменился) if(max_ind_prev == ~0 || fast_ind != max_ind_prev || cntr%_ROUTER_LOG_FREQ_2 == 0) { size_t slow_ind = (fast_ind == 0)? 1 : 0; _ofs_fast_router_stat << "faster router index is " << fast_ind << "(" << _conn_ctx[fast_ind].conn_str << "); "; _ofs_fast_router_stat << "msg stat: " << message_by_index[fast_ind] << "/" << message_by_index[slow_ind] << std::endl; } max_ind_prev = fast_ind; } CG_RESULT OnStreamUniqueMessage(cg_conn_t* conn_not_used, cg_listener_t* listener, struct cg_msg_t* message, void* data, size_t connection_index) override { if(!_ofs_msg_outfile.is_open()) return CG_ERR_INTERNAL; // need for testing only //if(message->type == CG_MSG_P2REPL_LIFENUM) //{ // close_stat_files(); // open_stat_files(); // // return CG_ERR_OK; //} // need for testing only char buf[_MAX_BUF]; const char endl [] = "\r\n"; size_t sz = sizeof(buf) - sizeof(endl); print_best_connection_statistics(connection_index); cg_msg_dump(message, nullptr, buf, &sz); strcat_s(buf, endl); _ofs_msg_outfile << buf << std::endl; return CG_ERR_OK; } private: const size_t _connection_cnt; CONNECTION_CTX* _conn_ctx; mutable std::ofstream _ofs_msg_outfile; mutable std::ofstream _ofs_fast_router_stat; static const size_t _MAX_BUF = 8192; static const size_t _ROUTER_LOG_FREQ_1 = 100; static const size_t _ROUTER_LOG_FREQ_2 = 1000; }; int main(int argc, char* argv[]) { #ifdef _WIN32 SetConsoleCtrlHandler(InterruptHandler, 1); #endif//_WIN32 if (argc < 3 || argc > 4) { printf("Usage:\n\t app_name.exe stream connection_1 connection_2\n\n" "where\n\t \"stream\" - connection creation string is set in the URL format in the following way: \"p2repl://STREAM_NAME[;param1=value1[;param2=value[;...[;paramN=valueN]]]];" "\n\t \"connection_1\" and \"connection_2\" - connection creation string is set in the URL format in the following way: \"TYPE://HOST:PORT;param1=value1;param2=value;...;paramN=valueN\";"); //"\n\t for example: p2repl://FORTS_FUTCOMMON_REPL p2tcp://127.0.0.1:4001;app_name=aggrspy11 p2tcp://127.0.0.1:4003;app_name=aggrspy22."); return 2; } std::string stream(argv[1]); connection_ctx[0].conn_str = argv[2]; if(argc == 4) connection_ctx[1].conn_str = argv[3]; else connection_cnt = 1; // Environment is initialized using provided INI-file CHECK_FAIL(env_open("ini=multi.ini;key=11111111;minloglevel=trace")); try { MultiConnectorChild app(connection_ctx, connection_cnt, stream); while (!bExit) // check global interrupt flag { try { app.run(); } catch(const char* what) { log_info(what); } if(_kbhit()) { bExit = TRUE; } } } catch (std::exception& e) { fprintf(stderr, "Exception: %s", e.what()); } for(size_t i = 0; i < connection_cnt; ++i) { if (connection_ctx[i].conn != NULL) conn_destroy(connection_ctx[i].conn); } // close environment CHECK_FAIL(env_close()); return 0; }