#include #include #include #include #include #include #include #include #include const int MAX_CLIENTS = 10; const int BUFFER_SIZE = 1024; const int NUM_WORKER_THREADS = 4; // Number of worker threads std::atomic running{true}; struct Client { int socket; sockaddr_in address; char buffer[BUFFER_SIZE]; bool connected; }; std::vector clients; std::vector rings; // Vector of io_uring instances void handleClient(int threadId) { io_uring& ring = rings[threadId]; ssize_t bytesRead; io_uring_sqe* sqe; io_uring_cqe* cqe; Client& client = clients[threadId]; // Prepare the receive operation io_uring_prep_recv(sqe, client.socket, client.buffer, BUFFER_SIZE, 0); io_uring_sqe_set_data(sqe, &client); io_uring_submit(&ring); while (running && client.connected) { io_uring_submit_and_wait(&ring, 1); io_uring_peek_cqe(&ring, &cqe); if (cqe->res == -EAGAIN) continue; bytesRead = cqe->res; if (bytesRead <= 0) break; std::string message(client.buffer, bytesRead); // Process the received message here // Prepare for the next receive operation io_uring_prep_recv(sqe, client.socket, client.buffer, BUFFER_SIZE, 0); io_uring_submit(&ring); io_uring_cqe_seen(&ring, cqe); } // Client disconnected close(client.socket); client.connected = false; } int main() { int serverSocket; sockaddr_in serverAddress{}; if ((serverSocket = socket(AF_INET, SOCK_STREAM, 0)) == 0) { std::cerr << "Failed to create socket." << std::endl; return 1; } serverAddress.sin_family = AF_INET; serverAddress.sin_addr.s_addr = INADDR_ANY; serverAddress.sin_port = htons(8888); if (bind(serverSocket, (struct sockaddr*)&serverAddress, sizeof(serverAddress)) < 0) { std::cerr << "Failed to bind." << std::endl; return 1; } if (listen(serverSocket, MAX_CLIENTS) < 0) { std::cerr << "Failed to listen." << std::endl; return 1; } std::vector workerThreads; for (int i = 0; i < NUM_WORKER_THREADS; ++i) { io_uring ring; io_uring_queue_init(256, &ring, 0); rings.push_back(ring); workerThreads.emplace_back([i]() { while (running) { io_uring_submit_and_wait(&rings[i], 1); // handleClient logic for worker thread i } }); } std::cout << "Server started. Listening for incoming connections..." << std::endl; std::thread listenerThread([&serverSocket]() { while (running) { int clientSocket = accept(serverSocket, nullptr, nullptr); if (clientSocket > 0) { Client client{clientSocket, {}, {}, true}; clients.push_back(client); // Determine the ring index for the client int ringIndex = clients.size() % NUM_WORKER_THREADS; io_uring_sqe* sqe = io_uring_get_sqe(&rings[ringIndex]); io_uring_prep_poll_add(sqe, clientSocket, POLLIN); io_uring_submit(&rings[ringIndex]); } } }); std::string command; while (std::getline(std::cin, command)) { if (command == "exit") { running = false; break; } } close(serverSocket); for (auto& ring : rings) { io_uring_queue_exit(&ring); } for (auto& thread : workerThreads) { thread.join(); } listenerThread.join(); return 0; }