LinuxQuestions.org
Latest LQ Deal: Latest LQ Deals
Home Forums Tutorials Articles Register
Go Back   LinuxQuestions.org > Forums > Non-*NIX Forums > Programming
User Name
Password
Programming This forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.

Notices


Reply
  Search this Thread
Old 05-06-2009, 05:55 AM   #1
kurtwagner
LQ Newbie
 
Registered: May 2009
Posts: 2

Rep: Reputation: 0
Question C++ Multicast UDP send with errno 11


Hi,

I am working with a C++ program consisting of two threads. The first threads receives packets through an UDP unicast connection and stores them in a buffer. The second thread reads the packets from the buffer and sends them through an UDP multicast connection. Both use blocking sockets and share a common buffer and a linked list L1, which are protected by mutexes.

The program seemed to work just fine, receiving a packet and sending it almost immediately, but started giving some trouble recently.The synchronization between both thread started failing, and I decided to use a non-blocking socket in the sending thread. As a consequence, sendto() doesn't work in some cases, causing an errno 11 (Resource unavailable). What could be the cause? How could I fix it? Here is my code


Code:
void * envia(void * Tx) //Envío, no pongo temporización, envío según puedo
{     
        int nd=0;           
        int t,s,l_lista,p, s_num=0;
        unsigned char* bff;        
        struct timespec valref, valtx;
        int Desc=((struct Param *) Tx)->Desc;
        
        GTI_sock cliente;
        int Puerto= 12800; char *final="192.168.1.7";
        cliente.create_UDPsocket((char *) final, Puerto); //Socket para recepción de vídeo
       s=cliente.bind_UDPsocket(); if (s<0) cout<<"Problemas en socket rx";
       cout<<"Conexion envio dentro de envia() con status"<<s<<endl;
        Desc=cliente.getDescriptor();                
      fcntl(Desc, F_SETFL, O_NONBLOCK);
        int tam=500e6;        
        s= setsockopt(cliente.getDescriptor(), IPPROTO_IP, SO_SNDBUF, &tam, sizeof(tam));        
          if (s<0)
              printf("Problemas al cambiar el tam de buffer");    
        
        sockaddr_in Direccion2;
        char *origen="224.255.255.3";
        Direccion2.sin_family=AF_INET;
	Direccion2.sin_port=htons(12900);
	Direccion2.sin_addr.s_addr=inet_addr((char *) origen);
        memset(&(Direccion2.sin_zero),'\0',8);
        
        cout<<"Hilo de transmisión "<<endl;
        t=clock_gettime(CLOCK_REALTIME, &valref); //Primer instante de referencia
        s_num=-1;
        while(1)      
        {
      
        pthread_mutex_lock(&mutex1);
        nd=Ndata;  
       pthread_mutex_unlock(&mutex1);
        if (nd<=0)
        {
           usleep(150);
         }
           s=0;    
          // cout<<"Antes del lock"<<endl;      
            pthread_mutex_lock(&mutex1);                
            L1.Empezar();
            l_lista=L1.Get_SLList_size();
            cout<<"Recorro la lista para trx "<<endl;
            while((L1.Get_NTx()>0)||(L1.Get_tam()==0)){
                L1.Next_Lista(); 
                s++;   
                if(s>=l_lista){  
                    break;
                }
            }
            if (s<l_lista){
               cout<<"s vale "<<s<<", l_lista vale "<<l_lista<<endl;
                t=L1.Get_tam();
              bff=L1.Get_pkt();
              s_num=L1.Get_seq();
              //cout<<"Toca Transmitir"<<endl;
              L1.Update_NTx();  
              Ndata--; //queda uno menos por enviar     
  }
            pthread_mutex_unlock(&mutex1);
            if (s<l_lista){                
            p=sendto(Desc, bff, t ,0,(struct sockaddr *)&Direccion2, (socklen_t) sizeof(struct sockaddr))         
&Dir2, (socklen_t) sizeof(struct sockaddr));                 
                   tx=tx+1;  
                   cout<<"Enviado pkt "<< s_num<<endl;
                   s=clock_gettime(CLOCK_REALTIME, &valtx);
                   valref=valtx-valref;       
                   R_tx=valref.tv_sec*1e9+valref.tv_nsec;
                   R_tx=t*1e9/R_tx;//bytes/segundo
                   B_Tx=B_Tx+t;
                   valref=valtx;               
            }
          if (p<0){
                printf("Fallo al enviar, código %i", errno);cout<<endl;}
        }
        }  
        cout<<"Salida del hilo de transmisión"<<endl;
}

void * recibe(void * Rx)
{     //RUtina de recepcion, supongo que no hay pérdida de pkts
        cout<<"Hilo de recepción"<<endl;
        ListaPtr Ptr_temp= (ListaPtr ) malloc(sizeof(ListaPtr));
        NAL_head* N_Header= new NAL_head;
        Slice_head* S_head=  new Slice_head; 
        int p, stat, lost, l1, l2, rx_num, tam_list, total_lost, GOP_id, s_t=0; //número de GOP
        int Ds=((struct Param *) Rx)->Desc; 
        
        GTI_sock cliente;
        int Puerto= 1270; char *final="138.4.32.29";
        cliente.create_UDPsocket((char *) final, Puerto); //Socket para recepción de vídeo
        p=cliente.bind_UDPsocket(); if (p<0) cout<<"Problemas en socket rx";
        cout<<"Conexion rx dentro de recibe () con status"<<p<<endl;
        Ds=cliente.getDescriptor();        
         int tam=500e6;        
        R_tot=R_tot/8;//bytes/s
        p= setsockopt(cliente.getDescriptor(), IPPROTO_IP, SO_RCVBUF, &tam, sizeof(tam));
          if (p<0)
              printf("Problemas al cambiar el tam de buffer de rx");
        
        struct timespec valorcontador, valorcontador2, valini, t_s;
        long B_Rx=0;
        valini.tv_sec=0; valini.tv_nsec=0;//Inicialización
        sockaddr_in Dir2=((struct Param *) Rx)->Dir;
        unsigned char* bini=((struct Param *) Rx)->buff;
	int tambuff=((struct Param *) Rx)->tam_b;
        int tambini=tambuff;
        int taux=floor(tambuff/100)*99;
        valorcontador2=((struct Param *) Rx)->T_dead;//Intervalo de deadline entre la recepcion y el borrado del pkt
        unsigned short seq_num;              
        rx_num=0;  total_lost=0;
        while(1)      
        {         

   p=recvfrom(Ds, buffer, tambuff, 0, NULL, NULL);// (struct sockaddr *) &Dir2, &tamanyo);         
            B_Rx=B_Rx+p;
            if (rx_num==0){
                stat=clock_gettime(CLOCK_REALTIME,&valorcontador); //Tomamos el valor del reloj actual                  
            }
            pthread_mutex_lock(&mutex1);
           seq_num=*((unsigned short *)(buffer+2));// seq_num=get_seq(buffer);        
           seq_num=ntohs(seq_num);
            if((seq_num<(Last_seq))&&((Last_seq-seq_num)<32767))  
            {  
                lost=L1.Find_SeqNum(seq_num);  //Busca el nodo Lista asociado al numero de secuencia
                if ((lost>0)&&(L1.Get_tam()==0)){  
                    cout<<"Recuperado el pkt con número de secuencia "<<seq_num<<endl; 
                    rx_num++; 
                   L1.Set_tam(p);
                    L1.Set_pkt(buffer);
                    L1.Set_GOPid(GOP_id);//No es válido, se pueden haber perdido pkts entre GOPs
                    valini=valorcontador+(get_tstamp(buffer)-t_s);
                    L1.Set_Time(valini);
                }
            }
            else if((seq_num!=(Last_seq+1))&&(rx_num>0)) 
            {
                lost=seq_num-Last_seq;
                if (lost<0){  //Se han perdido pkts justo al dar la vuelta                    
                     lost=65535-Last_seq+seq_num;
                     total_lost=total_lost+lost;
                     cout<<"perdidas de "<<lost<< " paquetes al dar la vuelta"<<endl;
                     if (lost>0)
                         cout<<"Se han perdido "<<lost <<" paquetes al dar la vuelta"<<endl;
                     lost=Last_seq;
                     //valini=valorcontador+(get_tstamp(buffer)-t_s);
                     valorcontador=valini+(get_tstamp(buffer)-t_s);
                     while (lost<65535){
                        lost++; 
                     L1.AddLista(0,  (unsigned short) lost, GOP_id, 152, t_s, NULL, valorcontador);  
                     }
                     lost=0;
                     while (lost<seq_num){  
  L1.AddLista(0,  (unsigned short) lost, GOP_id, 152, t_s, NULL, valorcontador);
                   lost++;                        
                    }
                     Last_seq=seq_num;
                          rx_num++; 
                      L1.AddLista(p,  (unsigned short) lost, GOP_id, s_t, t_s, buffer, valorcontador);            
                      cout<<"Guardado el pkt "<<seq_num<<" al dar la vuelta"<<endl;                      
                     Ndata++; 
                      cout<<"Llevamos perdidos "<< total_lost<< " paquetes"<<endl;
                }else{
                     lost=Last_seq;             
                    cout<<"Se han perdido "<< seq_num-lost-1 <<" paquetes de forma normal"<<endl;   
                    total_lost=total_lost+seq_num-lost-1;
                    valorcontador=valini+(get_tstamp(buffer)-t_s);
                    while (lost<(seq_num-1)){                        
                        lost++;                    
                        valorcontador=valini+(get_tstamp(buffer)-t_s);
                        L1.AddLista(0,  (unsigned short) lost, GOP_id, 152, t_s, NULL, valorcontador);              
                   }
                    Last_seq=seq_num;
                    rx_num++; 
                    valorcontador=valini+(get_tstamp(buffer)-t_s);             
                    L1.AddLista(p, (unsigned short) seq_num,  GOP_id, s_t, t_s, buffer, valorcontador); 
                    Ndata++;  
                }
                 cout<<"Llevamos perdidos "<< total_lost<< " paquetes"<<endl;
            }else{
                Last_seq=seq_num;
                if(rx_num==0){//Primer pkt recibido correctamente           
                    cout<<"Recibido el primer paquete, con secuencia "<< seq_num<<endl;
                    valorcontador=valorcontador+valorcontador2;
                    valini=valorcontador;
                    t_s=get_tstamp(buffer);
                    rx_num++; 
                }else{                 
                    valorcontador=valini+(get_tstamp(buffer)-t_s);                    
                    rx_num++; 
                } 
                L1.AddLista(p, (unsigned short) seq_num, GOP_id, s_t, t_s, buffer, valorcontador); 
                  Ndata++;  

            } 
            if (((int) (buffer+p-bini))>taux){
                buffer=bini;
                tambuff=tambini;
                cout<<"Reinicio del Buffer Circular"<<endl;     

                }else{
                    buffer=buffer+p; 
                    tambuff=tambuff-p;
                }   
           pthread_mutex_unlock(&mutex1);  
          cout<<"Recibido pkt "<< seq_num<<endl;//<<" y la secuencia anterior es "<< Last_seq << endl;

        }
        cout<<"Salida del hilo de recepción"<<endl;
}

Last edited by kurtwagner; 05-07-2009 at 02:34 AM.
 
Old 05-06-2009, 07:06 AM   #2
David1357
Senior Member
 
Registered: Aug 2007
Location: South Carolina, U.S.A.
Distribution: Ubuntu, Fedora Core, Red Hat, SUSE, Gentoo, DSL, coLinux, uClinux
Posts: 1,302
Blog Entries: 1

Rep: Reputation: 107Reputation: 107
Quote:
Originally Posted by kurtwagner View Post
Here is my code
You might want to resubmit your question with your code inside \[CODE\] tags. For example
Code:
#include <stdio.h>

int main(void)
{
    printf("Some formatted code...\n");
}
 
Old 05-06-2009, 08:20 AM   #3
dwhitney67
Senior Member
 
Registered: Jun 2006
Location: Maryland
Distribution: Kubuntu, Fedora, RHEL
Posts: 1,541

Rep: Reputation: 335Reputation: 335Reputation: 335Reputation: 335
@ OP -

You may want to take into consideration that std::cout is not thread safe; I believe the same holds true for printf(), but I may be wrong.

It is hard to read your code w/o the code-block formatting; perhaps you will take the time to edit your post.
 
Old 05-06-2009, 09:58 AM   #4
David1357
Senior Member
 
Registered: Aug 2007
Location: South Carolina, U.S.A.
Distribution: Ubuntu, Fedora Core, Red Hat, SUSE, Gentoo, DSL, coLinux, uClinux
Posts: 1,302
Blog Entries: 1

Rep: Reputation: 107Reputation: 107
Quote:
Originally Posted by kurtwagner View Post
pthread_mutex_lock(&mutex1);
This function can fail. You should check the return value and add error handling code.

Quote:
Originally Posted by kurtwagner View Post
p=recvfrom(Ds, buffer, tambuff, 0, NULL, NULL);// (struct sockaddr *) &Dir2, &tamanyo);
This function can fail. You should check the return value and add error handling code.

After you have added error checking to all the functions that can return error values, you will probably find your problem.
 
Old 05-07-2009, 02:41 AM   #5
kurtwagner
LQ Newbie
 
Registered: May 2009
Posts: 2

Original Poster
Rep: Reputation: 0
Thanks for the advice, I will try it all and get back to you.
 
  


Reply



Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is Off
HTML code is Off



Similar Threads
Thread Thread Starter Forum Replies Last Post
Cannot Receive UDP Multicast Packets from Windows chewygumstix Linux - Networking 1 12-10-2008 02:47 PM
kernel and multicast udp traffic zeebu Linux - Networking 5 06-02-2007 09:22 AM
Strange load oscillation receiving udp multicast nathan2225 Linux - Networking 0 03-28-2007 04:10 AM
PVR with udp multicast streams? Fredde87 Linux - Software 1 08-20-2005 06:30 PM
Can send multicast, but not recieve (Javagroups test) gdd Linux - Networking 1 07-18-2003 07:10 AM

LinuxQuestions.org > Forums > Non-*NIX Forums > Programming

All times are GMT -5. The time now is 10:41 PM.

Main Menu
Advertisement
My LQ
Write for LQ
LinuxQuestions.org is looking for people interested in writing Editorials, Articles, Reviews, and more. If you'd like to contribute content, let us know.
Main Menu
Syndicate
RSS1  Latest Threads
RSS1  LQ News
Twitter: @linuxquestions
Open Source Consulting | Domain Registration