VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Objective-C编程 >
  • 大量数据转录的多线程和同步处理的实现

制作者:剑锋冷月 单位:无忧统计网,www.51stat.net
  项目中需要对两个不同格式的存储设备进行数据转录,因为数据量非常大,所以时间非常缓慢;解决方案是使用ReaderWriterSlim类建立一个共享的同步数据,可以支持一个线程读取外部设备,向同步数据写入;多个线程从同步数据中读取,转换格式,然后写入到本地设备。
  本例中采用Queue<T>作为存放数据的集合,写入线程向它的尾部写入对象,读取线程从它的头部获取对象。
  需要注意的是,由于Queue会抛弃已处理的对象,所以在同步数据队列中无法验证数据对象的唯一性,被写入的数据库需要去掉唯一约束,或在写入时向数据库请求验证。
  首先定义一个读写接口:
namespaceCommon
{
  publicinterfaceIReaderWriter<T>
  {
    TRead(intargument);
    voidWrite(intarugment,Tinstance);
    voidDelete(intargument);
    voidClear();
  }
}
  然后实现一个队列的读写器:
namespaceCommon
{
  publicclassQueueReaderWriter<T>:IReaderWriter<T>
  {
    privateQueue<T>queues;
    publicQueueReaderWriter()
    {
      queues=newQueue<T>();
    }
    publicQueueReaderWriter(intcapacity)
    {
      queues=newQueue<T>(capacity);
    }
    #regionIReadWrite<T>成员
    publicTRead(intargument)
    {
      returnqueues.FirstOrDefault();
    }
    publicvoidWrite(intarugment,Tinstance)
    {
      queues.Enqueue(instance);
    }
    publicvoidDelete(intargument)
    {
      queues.Dequeue();
    }
    publicvoidClear()
    {
      queues.Clear();
      queues.TrimExcess();
    }
    #endregion
  }
}
  使用ReaderWriterLockSlim实现同步数据类:
namespaceCommon
{
  publicclassSynchronizedWriteData<T>:IDisposable
  {
    privateReaderWriterLockSlim_dataLock=newReaderWriterLockSlim();
    privateIReaderWriter<T>_innerData;
    privateSynchronizedWriteData()
    {}
    publicSynchronizedWriteData(IReaderWriter<T>innerData)
    {
      _innerData=innerData;
    }
    publicTRead()
    {
      _dataLock.EnterReadLock();
      try
      {
        return_innerData.Read(0);
      }
      finally
      {
        _dataLock.ExitReadLock();
      }
    }
    publicTRead(intargument)
    {
      _dataLock.EnterReadLock();
      try
      {
        return_innerData.Read(argument);
      }
      finally
      {
        _dataLock.ExitReadLock();
      }
    }
    publicvoidAdd(Tinstance)
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Write(0,instance);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    publicvoidAdd(intargument,Tinstance)
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Write(argument,instance);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    publicboolAddWithTimeout(Tinstance,inttimeout)
    {
      if(_dataLock.TryEnterWriteLock(timeout))
      {
        try
        {
          _innerData.Write(0,instance);
        }
        finally
        {
          _dataLock.ExitWriteLock();
        }
        returntrue;
      }
      else
      {
        returnfalse;
      }
    }
    publicboolAddWithTimeout(intargument,Tinstance,inttimeout)
    {
      if(_dataLock.TryEnterWriteLock(timeout))
      {
        try
        {
          _innerData.Write(argument,instance);
        }
        finally
        {
          _dataLock.ExitWriteLock();
        }
        returntrue;
      }
      else
      {
        returnfalse;
      }
    }
    publicvoidDelete()
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Delete(0);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    publicvoidDelete(intargument)
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Delete(argument);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    #regionIDisposable成员
    publicvoidDispose()
    {
      try
      {
        _dataLock.EnterWriteLock();
        {
          try
          {
            _innerData.Clear();
          }
          finally
          {
            _dataLock.ExitWriteLock();
          }
        }
      }
      finally
      {
        _dataLock.Dispose();
      }
    }
    #endregion
  }
}
  
namespaceExternalDataHandle
{
  ///<summary>
  ///从外部数据源获取到内部数据源的适配器抽象类
  ///</summary>
  ///<typeparamname="T">T数据对象类型</typeparam>
  publicabstractclassExternalDataAdapter<T>:IDisposable
  {
    ///<summary>
    ///外部数据源连接字符串
    ///</summary>
    protectedabstractstringConnectString{get;}
    ///<summary>
    ///提供初始化数据适配器的方法
    ///</summary>
    protectedabstractvoidInitialize();
    ///<summary>
    ///提供数据传递的方法
    ///</summary>
    publicabstractvoidTransmit();
    ///<summary>
    ///提供从外部数据设备读取数据的方法
    ///</summary>
    protectedabstractvoidReadFromExternalDevice();
    ///<summary>
    ///提供保存数据到内部设备的方法
    ///</summary>
    protectedabstractvoidSaveToInternalDevice();
    #regionIDisposable成员
    publicabstractvoidDispose();
    #endregion
  }
}
  多线程数据转录类,本例只使用了一个读取线程:
namespaceExternalDataHandle
{
  ///<summary>
  ///提供多线程方式从外部数据源获取到内部数据源的适配器类
  ///</summary>
  ///<typeparamname="T"></typeparam>
  publicabstractclassMultiThreadAdapter<T>:ExternalDataAdapter<T>
  {
    protectedSynchronizedWriteData<T>_data;
    protectedThread_readThread;
    protectedabstractoverridestringConnectString{get;}
    protectedabstractoverridevoidInitialize();
    publicsealedoverridevoidTransmit()
    {
      _readThread=newThread(newThreadStart(ReadFromExternalDevice));
      _readThread.Start();
      Thread.Sleep(10000);
      while(_readThread.IsAlive)
      {
        SaveToInternalDevice();
      }
      _readThread.Join();
    }
    protectedabstractoverridevoidReadFromExternalDevice();
    protectedabstractoverridevoidSaveToInternalDevice();
    publicoverridevoidDispose()
    {
      if(_data!=null)
      {
        _data.Dispose();
      }
    }
  }
}
  使用ReaderWriterLockSlim实现同步数据类:
namespaceCommon
{
  publicclassSynchronizedWriteData<T>:IDisposable
  {
    privateReaderWriterLockSlim_dataLock=newReaderWriterLockSlim();
    privateIReaderWriter<T>_innerData;
    privateSynchronizedWriteData()
    {}
    publicSynchronizedWriteData(IReaderWriter<T>innerData)
    {
      _innerData=innerData;
    }
    publicTRead()
    {
      _dataLock.EnterReadLock();
      try
      {
        return_innerData.Read(0);
      }
      finally
      {
        _dataLock.ExitReadLock();
      }
    }
    publicTRead(intargument)
    {
      _dataLock.EnterReadLock();
      try
      {
        return_innerData.Read(argument);
      }
      finally
      {
        _dataLock.ExitReadLock();
      }
    }
    publicvoidAdd(Tinstance)
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Write(0,instance);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    publicvoidAdd(intargument,Tinstance)
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Write(argument,instance);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    publicboolAddWithTimeout(Tinstance,inttimeout)
    {
      if(_dataLock.TryEnterWriteLock(timeout))
      {
        try
        {
          _innerData.Write(0,instance);
        }
        finally
        {
          _dataLock.ExitWriteLock();
        }
        returntrue;
      }
      else
      {
        returnfalse;
      }
    }
    publicboolAddWithTimeout(intargument,Tinstance,inttimeout)
    {
      if(_dataLock.TryEnterWriteLock(timeout))
      {
        try
        {
          _innerData.Write(argument,instance);
        }
        finally
        {
          _dataLock.ExitWriteLock();
        }
        returntrue;
      }
      else
      {
        returnfalse;
      }
    }
    publicvoidDelete()
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Delete(0);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    publicvoidDelete(intargument)
    {
      _dataLock.EnterWriteLock();
      try
      {
        _innerData.Delete(argument);
      }
      finally
      {
        _dataLock.ExitWriteLock();
      }
    }
    #regionIDisposable成员
    publicvoidDispose()
    {
      try
      {
        _dataLock.EnterWriteLock();
        {
          try
          {
            _innerData.Clear();
          }
          finally
          {
            _dataLock.ExitWriteLock();
          }
        }
      }
      finally
      {
        _dataLock.Dispose();
      }
    }
    #endregion
  }
}
  
namespaceExternalDataHandle
{
  ///<summary>
  ///从外部数据源获取到内部数据源的适配器抽象类
  ///</summary>
  ///<typeparamname="T">T数据对象类型</typeparam>
  publicabstractclassExternalDataAdapter<T>:IDisposable
  {
    ///<summary>
    ///外部数据源连接字符串
    ///</summary>
    protectedabstractstringConnectString{get;}
    ///<summary>
    ///提供初始化数据适配器的方法
    ///</summary>
    protectedabstractvoidInitialize();
    ///<summary>
    ///提供数据传递的方法
    ///</summary>
    publicabstractvoidTransmit();
    ///<summary>
    ///提供从外部数据设备读取数据的方法
    ///</summary>
    protectedabstractvoidReadFromExternalDevice();
    ///<summary>
    ///提供保存数据到内部设备的方法
    ///</summary>
    protectedabstractvoidSaveToInternalDevice();
    #regionIDisposable成员
    publicabstractvoidDispose();
    #endregion
  }
}
  多线程数据转录类,本例只使用了一个读取线程:
namespaceExternalDataHandle
{
  ///<summary>
  ///提供多线程方式从外部数据源获取到内部数据源的适配器类
  ///</summary>
  ///<typeparamname="T"></typeparam>
  publicabstractclassMultiThreadAdapter<T>:ExternalDataAdapter<T>
  {
    protectedSynchronizedWriteData<T>_data;
    protectedThread_readThread;
    protectedabstractoverridestringConnectString{get;}
    protectedabstractoverridevoidInitialize();
    publicsealedoverridevoidTransmit()
    {
      _readThread=newThread(newThreadStart(ReadFromExternalDevice));
      _readThread.Start();
      Thread.Sleep(10000);
      while(_readThread.IsAlive)
      {
        SaveToInternalDevice();
      }
      _readThread.Join();
    }
    protectedabstractoverridevoidReadFromExternalDevice();
    protectedabstractoverridevoidSaveToInternalDevice();
    publicoverridevoidDispose()
    {
      if(_data!=null)
      {
        _data.Dispose();
      }
    }
  }
}
 


相关教程