mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-31 01:43:02 +08:00
Merge pull request #525 from pijyoi/master
remote_thr.cpp: port pthreads usage to win32 api
This commit is contained in:
commit
be1160832e
31
builds/mingw32/Makefile.mingw32
Normal file
31
builds/mingw32/Makefile.mingw32
Normal file
@ -0,0 +1,31 @@
|
||||
CC=gcc
|
||||
CFLAGS=-Wall -Os -g -DDLL_EXPORT -DFD_SETSIZE=1024 -I.
|
||||
LIBS=-lws2_32
|
||||
|
||||
OBJS = address.o clock.o ctx.o dealer.o decoder.o devpoll.o dist.o encoder.o epoll.o err.o fq.o \
|
||||
io_object.o io_thread.o ip.o ipc_address.o ipc_connecter.o ipc_listener.o kqueue.o lb.o \
|
||||
mailbox.o msg.o mtrie.o object.o options.o own.o pair.o pgm_receiver.o pgm_sender.o \
|
||||
pgm_socket.o pipe.o poll.o poller_base.o precompiled.o proxy.o pub.o pull.o push.o \
|
||||
random.o raw_decoder.o raw_encoder.o reaper.o rep.o req.o router.o select.o session_base.o \
|
||||
signaler.o socket_base.o stream_engine.o sub.o tcp.o tcp_address.o tcp_connecter.o tcp_listener.o \
|
||||
thread.o trie.o v1_decoder.o v1_encoder.o xpub.o xsub.o zmq.o zmq_utils.o
|
||||
|
||||
%.o: ../../src/%.cpp
|
||||
$(CC) -c -o $@ $< $(CFLAGS)
|
||||
|
||||
%.o: ../../perf/%.cpp
|
||||
$(CC) -c -o $@ $< $(CFLAGS)
|
||||
|
||||
all: libzmq.dll
|
||||
|
||||
perf: inproc_lat.exe inproc_thr.exe local_lat.exe local_thr.exe remote_lat.exe remote_thr.exe
|
||||
|
||||
libzmq.dll: $(OBJS)
|
||||
g++ -shared -o $@ $^ -Wl,--out-implib,$@.a $(LIBS)
|
||||
|
||||
%.exe: %.o libzmq.dll
|
||||
g++ -o $@ $^
|
||||
|
||||
clean:
|
||||
del *.o *.a *.dll *.exe
|
||||
|
32
builds/mingw32/platform.hpp
Normal file
32
builds/mingw32/platform.hpp
Normal file
@ -0,0 +1,32 @@
|
||||
/*
|
||||
Copyright (c) 2007-2011 iMatix Corporation
|
||||
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __ZMQ_PLATFORM_HPP_INCLUDED__
|
||||
#define __ZMQ_PLATFORM_HPP_INCLUDED__
|
||||
|
||||
// This is the platform definition for the MSVC platform.
|
||||
// As a first step of the build process it is copied to
|
||||
// zmq directory to take place of platform.hpp generated from
|
||||
// platform.hpp.in on platforms supported by GNU autotools.
|
||||
// Place any MSVC-specific definitions here.
|
||||
|
||||
#define ZMQ_HAVE_WINDOWS
|
||||
|
||||
#endif
|
@ -19,8 +19,8 @@
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "zmq.h"
|
||||
#include "zmq_utils.h"
|
||||
#include "../include/zmq.h"
|
||||
#include "../include/zmq_utils.h"
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
@ -19,8 +19,8 @@
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "zmq.h"
|
||||
#include "zmq_utils.h"
|
||||
#include "../include/zmq.h"
|
||||
#include "../include/zmq_utils.h"
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
@ -28,7 +28,15 @@
|
||||
#include <time.h>
|
||||
#include <limits.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "platform.hpp"
|
||||
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
#include <windows.h>
|
||||
#include <process.h>
|
||||
#else
|
||||
#include <pthread.h>
|
||||
#endif
|
||||
|
||||
#define ZMSG 1
|
||||
#define DATA 0
|
||||
@ -82,18 +90,22 @@ void my_free (void *data, void *hint)
|
||||
//free (data);
|
||||
}
|
||||
|
||||
static void *worker_routine (void *ctx) {
|
||||
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
static unsigned int __stdcall worker_routine (void *ctx)
|
||||
#else
|
||||
static void *worker_routine (void *ctx)
|
||||
#endif
|
||||
{
|
||||
int rc,i;
|
||||
void *buf = NULL;
|
||||
|
||||
if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;}
|
||||
if( !(buf = malloc( message_size))){ perror("malloc"); return 0;}
|
||||
|
||||
void *s = zmq_socket (ctx, flow);
|
||||
|
||||
if (!s) {
|
||||
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Add your socket options here.
|
||||
@ -105,44 +117,44 @@ static void *worker_routine (void *ctx) {
|
||||
rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
sndbuflen = 1;
|
||||
rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
sndbuflen = 2;
|
||||
rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
sndbuflen = 2;
|
||||
rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024);
|
||||
@ -151,7 +163,7 @@ static void *worker_routine (void *ctx) {
|
||||
rc = zmq_connect (s, connect_to);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA");
|
||||
@ -167,13 +179,13 @@ static void *worker_routine (void *ctx) {
|
||||
rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
rc = zmq_msg_send( &msg, s, 0);
|
||||
if (rc < 0) {
|
||||
printf ("error in zmq_send: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
rc = zmq_msg_close (&msg);
|
||||
@ -189,7 +201,7 @@ static void *worker_routine (void *ctx) {
|
||||
rc = zmq_send( s, buf, message_size, 0);
|
||||
if (rc < 0) {
|
||||
printf ("error in zmq_send: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
}}
|
||||
|
||||
@ -203,13 +215,13 @@ static void *worker_routine (void *ctx) {
|
||||
rc = zmq_msg_init (&msg);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
for (i = 0; i != message_count; i++) {
|
||||
rc = zmq_msg_recv (&msg, s, 0);
|
||||
if (rc < 0) {
|
||||
printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
}}
|
||||
|
||||
@ -219,7 +231,7 @@ static void *worker_routine (void *ctx) {
|
||||
rc = zmq_recv( s, buf, message_size, 0);
|
||||
if (rc < 0) {
|
||||
printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
}}
|
||||
}
|
||||
@ -227,12 +239,12 @@ static void *worker_routine (void *ctx) {
|
||||
rc = zmq_close (s);
|
||||
if (rc != 0) {
|
||||
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
free( buf);
|
||||
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main (int argc, char *argv [])
|
||||
@ -240,7 +252,6 @@ int main (int argc, char *argv [])
|
||||
void *ctx;
|
||||
int rc;
|
||||
int i;
|
||||
void *p;
|
||||
|
||||
if (argc != 10) {
|
||||
printf ("usage: remote_thr <connect-to> <message-size> <message-count> <SND buffer> <RCV buffer> <flow (PUSH/PULL)> <records (ZMSG/DATA)> <zmq-threads> <workers>\n");
|
||||
@ -280,21 +291,34 @@ int main (int argc, char *argv [])
|
||||
return -1;
|
||||
}
|
||||
|
||||
printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers);
|
||||
pthread_t worker[128];
|
||||
printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers);
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
HANDLE worker[128];
|
||||
#else
|
||||
pthread_t worker[128];
|
||||
#endif
|
||||
|
||||
US_TIMER timer;
|
||||
|
||||
tm_init( &timer);
|
||||
|
||||
for (i = 0; i < workers; i++) {
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
worker[i] = (HANDLE) _beginthreadex (NULL, 0, worker_routine, ctx, 0 , NULL);
|
||||
#else
|
||||
pthread_create (&worker[i], NULL, worker_routine, ctx);
|
||||
printf("Worker %d spawned\n", i);
|
||||
#endif
|
||||
printf("Worker %d spawned\n", i);
|
||||
}
|
||||
|
||||
for (i = 0; i < workers; i++) {
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
WaitForSingleObject (worker[i], INFINITE);
|
||||
CloseHandle (worker[i]);
|
||||
#else
|
||||
pthread_join( worker[i], &p);
|
||||
printf("Worker %d joined\n", i);
|
||||
#endif
|
||||
printf("Worker %d joined\n", i);
|
||||
}
|
||||
|
||||
float secs = tm_secs( &timer);
|
||||
|
Loading…
x
Reference in New Issue
Block a user