ProgrammingThis forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.
Notices
Welcome to LinuxQuestions.org, a friendly and active Linux Community.
You are currently viewing LQ as a guest. By joining our community you will have the ability to post topics, receive our newsletter, use the advanced search, subscribe to threads and access many other special features. Registration is quick, simple and absolutely free. Join our community today!
Note that registered members see fewer ads, and ContentLink is completely disabled once you log in.
If you have any problems with the registration process or your account login, please contact us. If you need to reset your password, click here.
Having a problem logging in? Please visit this page to clear all LQ-related cookies.
Get a virtual cloud desktop with the Linux distro that you want in less than five minutes with Shells! With over 10 pre-installed distros to choose from, the worry-free installation life is here! Whether you are a digital nomad or just looking for flexibility, Shells can put your Linux machine on the device that you want to use.
Exclusive for LQ members, get up to 45% off per month. Click here for more info.
I am working with a C++ program consisting of two threads. The first threads receives packets through an UDP unicast connection and stores them in a buffer. The second thread reads the packets from the buffer and sends them through an UDP multicast connection. Both use blocking sockets and share a common buffer and a linked list L1, which are protected by mutexes.
The program seemed to work just fine, receiving a packet and sending it almost immediately, but started giving some trouble recently.The synchronization between both thread started failing, and I decided to use a non-blocking socket in the sending thread. As a consequence, sendto() doesn't work in some cases, causing an errno 11 (Resource unavailable). What could be the cause? How could I fix it? Here is my code
Code:
void * envia(void * Tx) //Envío, no pongo temporización, envío según puedo
{
int nd=0;
int t,s,l_lista,p, s_num=0;
unsigned char* bff;
struct timespec valref, valtx;
int Desc=((struct Param *) Tx)->Desc;
GTI_sock cliente;
int Puerto= 12800; char *final="192.168.1.7";
cliente.create_UDPsocket((char *) final, Puerto); //Socket para recepción de vídeo
s=cliente.bind_UDPsocket(); if (s<0) cout<<"Problemas en socket rx";
cout<<"Conexion envio dentro de envia() con status"<<s<<endl;
Desc=cliente.getDescriptor();
fcntl(Desc, F_SETFL, O_NONBLOCK);
int tam=500e6;
s= setsockopt(cliente.getDescriptor(), IPPROTO_IP, SO_SNDBUF, &tam, sizeof(tam));
if (s<0)
printf("Problemas al cambiar el tam de buffer");
sockaddr_in Direccion2;
char *origen="224.255.255.3";
Direccion2.sin_family=AF_INET;
Direccion2.sin_port=htons(12900);
Direccion2.sin_addr.s_addr=inet_addr((char *) origen);
memset(&(Direccion2.sin_zero),'\0',8);
cout<<"Hilo de transmisión "<<endl;
t=clock_gettime(CLOCK_REALTIME, &valref); //Primer instante de referencia
s_num=-1;
while(1)
{
pthread_mutex_lock(&mutex1);
nd=Ndata;
pthread_mutex_unlock(&mutex1);
if (nd<=0)
{
usleep(150);
}
s=0;
// cout<<"Antes del lock"<<endl;
pthread_mutex_lock(&mutex1);
L1.Empezar();
l_lista=L1.Get_SLList_size();
cout<<"Recorro la lista para trx "<<endl;
while((L1.Get_NTx()>0)||(L1.Get_tam()==0)){
L1.Next_Lista();
s++;
if(s>=l_lista){
break;
}
}
if (s<l_lista){
cout<<"s vale "<<s<<", l_lista vale "<<l_lista<<endl;
t=L1.Get_tam();
bff=L1.Get_pkt();
s_num=L1.Get_seq();
//cout<<"Toca Transmitir"<<endl;
L1.Update_NTx();
Ndata--; //queda uno menos por enviar
}
pthread_mutex_unlock(&mutex1);
if (s<l_lista){
p=sendto(Desc, bff, t ,0,(struct sockaddr *)&Direccion2, (socklen_t) sizeof(struct sockaddr))
&Dir2, (socklen_t) sizeof(struct sockaddr));
tx=tx+1;
cout<<"Enviado pkt "<< s_num<<endl;
s=clock_gettime(CLOCK_REALTIME, &valtx);
valref=valtx-valref;
R_tx=valref.tv_sec*1e9+valref.tv_nsec;
R_tx=t*1e9/R_tx;//bytes/segundo
B_Tx=B_Tx+t;
valref=valtx;
}
if (p<0){
printf("Fallo al enviar, código %i", errno);cout<<endl;}
}
}
cout<<"Salida del hilo de transmisión"<<endl;
}
void * recibe(void * Rx)
{ //RUtina de recepcion, supongo que no hay pérdida de pkts
cout<<"Hilo de recepción"<<endl;
ListaPtr Ptr_temp= (ListaPtr ) malloc(sizeof(ListaPtr));
NAL_head* N_Header= new NAL_head;
Slice_head* S_head= new Slice_head;
int p, stat, lost, l1, l2, rx_num, tam_list, total_lost, GOP_id, s_t=0; //número de GOP
int Ds=((struct Param *) Rx)->Desc;
GTI_sock cliente;
int Puerto= 1270; char *final="138.4.32.29";
cliente.create_UDPsocket((char *) final, Puerto); //Socket para recepción de vídeo
p=cliente.bind_UDPsocket(); if (p<0) cout<<"Problemas en socket rx";
cout<<"Conexion rx dentro de recibe () con status"<<p<<endl;
Ds=cliente.getDescriptor();
int tam=500e6;
R_tot=R_tot/8;//bytes/s
p= setsockopt(cliente.getDescriptor(), IPPROTO_IP, SO_RCVBUF, &tam, sizeof(tam));
if (p<0)
printf("Problemas al cambiar el tam de buffer de rx");
struct timespec valorcontador, valorcontador2, valini, t_s;
long B_Rx=0;
valini.tv_sec=0; valini.tv_nsec=0;//Inicialización
sockaddr_in Dir2=((struct Param *) Rx)->Dir;
unsigned char* bini=((struct Param *) Rx)->buff;
int tambuff=((struct Param *) Rx)->tam_b;
int tambini=tambuff;
int taux=floor(tambuff/100)*99;
valorcontador2=((struct Param *) Rx)->T_dead;//Intervalo de deadline entre la recepcion y el borrado del pkt
unsigned short seq_num;
rx_num=0; total_lost=0;
while(1)
{
p=recvfrom(Ds, buffer, tambuff, 0, NULL, NULL);// (struct sockaddr *) &Dir2, &tamanyo);
B_Rx=B_Rx+p;
if (rx_num==0){
stat=clock_gettime(CLOCK_REALTIME,&valorcontador); //Tomamos el valor del reloj actual
}
pthread_mutex_lock(&mutex1);
seq_num=*((unsigned short *)(buffer+2));// seq_num=get_seq(buffer);
seq_num=ntohs(seq_num);
if((seq_num<(Last_seq))&&((Last_seq-seq_num)<32767))
{
lost=L1.Find_SeqNum(seq_num); //Busca el nodo Lista asociado al numero de secuencia
if ((lost>0)&&(L1.Get_tam()==0)){
cout<<"Recuperado el pkt con número de secuencia "<<seq_num<<endl;
rx_num++;
L1.Set_tam(p);
L1.Set_pkt(buffer);
L1.Set_GOPid(GOP_id);//No es válido, se pueden haber perdido pkts entre GOPs
valini=valorcontador+(get_tstamp(buffer)-t_s);
L1.Set_Time(valini);
}
}
else if((seq_num!=(Last_seq+1))&&(rx_num>0))
{
lost=seq_num-Last_seq;
if (lost<0){ //Se han perdido pkts justo al dar la vuelta
lost=65535-Last_seq+seq_num;
total_lost=total_lost+lost;
cout<<"perdidas de "<<lost<< " paquetes al dar la vuelta"<<endl;
if (lost>0)
cout<<"Se han perdido "<<lost <<" paquetes al dar la vuelta"<<endl;
lost=Last_seq;
//valini=valorcontador+(get_tstamp(buffer)-t_s);
valorcontador=valini+(get_tstamp(buffer)-t_s);
while (lost<65535){
lost++;
L1.AddLista(0, (unsigned short) lost, GOP_id, 152, t_s, NULL, valorcontador);
}
lost=0;
while (lost<seq_num){
L1.AddLista(0, (unsigned short) lost, GOP_id, 152, t_s, NULL, valorcontador);
lost++;
}
Last_seq=seq_num;
rx_num++;
L1.AddLista(p, (unsigned short) lost, GOP_id, s_t, t_s, buffer, valorcontador);
cout<<"Guardado el pkt "<<seq_num<<" al dar la vuelta"<<endl;
Ndata++;
cout<<"Llevamos perdidos "<< total_lost<< " paquetes"<<endl;
}else{
lost=Last_seq;
cout<<"Se han perdido "<< seq_num-lost-1 <<" paquetes de forma normal"<<endl;
total_lost=total_lost+seq_num-lost-1;
valorcontador=valini+(get_tstamp(buffer)-t_s);
while (lost<(seq_num-1)){
lost++;
valorcontador=valini+(get_tstamp(buffer)-t_s);
L1.AddLista(0, (unsigned short) lost, GOP_id, 152, t_s, NULL, valorcontador);
}
Last_seq=seq_num;
rx_num++;
valorcontador=valini+(get_tstamp(buffer)-t_s);
L1.AddLista(p, (unsigned short) seq_num, GOP_id, s_t, t_s, buffer, valorcontador);
Ndata++;
}
cout<<"Llevamos perdidos "<< total_lost<< " paquetes"<<endl;
}else{
Last_seq=seq_num;
if(rx_num==0){//Primer pkt recibido correctamente
cout<<"Recibido el primer paquete, con secuencia "<< seq_num<<endl;
valorcontador=valorcontador+valorcontador2;
valini=valorcontador;
t_s=get_tstamp(buffer);
rx_num++;
}else{
valorcontador=valini+(get_tstamp(buffer)-t_s);
rx_num++;
}
L1.AddLista(p, (unsigned short) seq_num, GOP_id, s_t, t_s, buffer, valorcontador);
Ndata++;
}
if (((int) (buffer+p-bini))>taux){
buffer=bini;
tambuff=tambini;
cout<<"Reinicio del Buffer Circular"<<endl;
}else{
buffer=buffer+p;
tambuff=tambuff-p;
}
pthread_mutex_unlock(&mutex1);
cout<<"Recibido pkt "<< seq_num<<endl;//<<" y la secuencia anterior es "<< Last_seq << endl;
}
cout<<"Salida del hilo de recepción"<<endl;
}
Last edited by kurtwagner; 05-07-2009 at 02:34 AM.
LinuxQuestions.org is looking for people interested in writing
Editorials, Articles, Reviews, and more. If you'd like to contribute
content, let us know.